Rust에서의 비동기 프로그래밍

이동훈·2021년 6월 8일
2
post-thumbnail

RUST 로 공부하는 비동기 프로그래밍 시리즈

백엔드 개발자라면 비동기프로그래밍에 대해서 정말 많은 이야기를 들어보셨을거에요. 면접 단골 질문인 Node.js의 Event Loop부터 GoLang의 Goroutine까지 정말
비동기 라는 말을 안 들어봤으면 백엔드 개발자가 아니라는 말도 있을 정도니 정말 중요한 주제임을 틀림이 없는것 같습니다. 그럼 과연 비동기 프로그래밍은 무엇이고 왜 알아야하고 어떻게 사용하는게 좋을까요? 저는 이 시리즈에서 비동기 프로그래밍에 대해서 간략한소개를 해볼까 합니다. 비동기 프로그래밍을 소개한 다른 글은 많지만 이 시리즈는 개념부터 실제로 비동기 런타임을 코드로 구현을 해서 최대한 자세하기 설명 하는것을 목표로 합니다. 제가 지금 현업에서 러스트를 사용하고 있고 러스트를 좋아하기 때문에 이 글에서의 코딩 부분은 러스트로 할 예정입니다~

​ 먼저 시리즈의 순서는

  1. 비동기 프로그래밍은 무엇인가?

  2. Scheduler 이해하기

  3. Rust에서의 비동기 프로그래밍

  4. Rust Tokio 이해하기

입니다. 이 중 오늘은 Rust에서의 비동기 프로그래밍에 대해서 알아보겠습니다. 틀린 부분이나 설명이 모호한 부분이 있다면 댓글 혹은 이메일로 알려주시면 감사하겠습니다~

Rust에서의 Scheduler 구현해보기

1, 2편을 통해 저희는 비동기 프로그래밍과 그 핵심인 Scheduler에 관해서 간단하게 알아보았습니다. 이 내용들을 바탕으로 이번 편에서는 Rust를 사용해서 실제로 저희만의 조그만 Scheduler를 구현해볼 생각입니다. 언어는 러스트이지만 러스트에 대한 기본적인 지식과 C/C++ 로 간단한 프로젝트를 진행해본 경험이 있으시다면 어렵게 다가오지는 않을거라고 저는 믿습니다!(러스트는 좋습니닿ㅎㅎㅎ) 이 튜토리얼은 제가 만든것이 아닌 https://cfsamson.github.io/book-exploring-async-basics/introduction.html에 기반하고 있습니다. 코드에 대한 더 자세한 설명을 알고 싶다시면 정독하시는걸 추천합니다!

먼저 실제로 코드를 짜기에 앞서 저희가 만들려고 하는 스케쥴러가 수행해야 하는 일들을 미리 한번 정의해보겠습니다.

  1. File read
  2. Encrpyt
  3. Network Call

보시다시피 1,3번은 I/O Intensive Task , 2번은 CPU Intenstive Task입니다. 실제로는 훨씬 더 많은 작업의 종류가 있겠지만 편의를 위해서 위 세 가지 작업만 지원하는 스케쥴러를 한번 만들어보겠습니다~

2편을 기억하신다면 제가 Scheduler 를 설명하면서 I/O intenstive task 같은 경우 작업이 끝나경우 CPU 가 해당 Task가 끝날걸 interrupt signal을 통해 알수 있다고 말한걸 아시고 계실겁니다. 이 부분에 대한 설명이 조금 모호해서 여기서 추가적인 설명을 하면 좋을 것 같습니다. 대부분의 모던 OS 에서는 OS 자체적으로 Event queue를 지원합니다! 즉, 저희가 전편에서 이야기했던 I/O Task들을 맡기고 Task가 끝나면 interrupt signal을 받는 부분을 OS가 자체적으로 구현을 하고 있습니다. 그렇기 때문에 저희는 OS에서 지원을 하는 이런 Event queue를 사용 해서 저희의 event queue를 대체할수 있습니다! 물론 OS 마다 api 가 달라 OS agnositc 한 api를 사용할수는 없습니다. 예를 들어 UNIX 기반 OS들은 Epoll을, Mac은 Kqueue,Windows 는 IOCP 의 api 를 가지고 있는데 다행히도 Cross platform api를 제공하는 라이브러리들이 있기에 저희는 라이브러리를 가져가다가 사용하면 됩니다. 여담으로 Node는 이 cross platorm library로 libc를 사용하고 있고 Rust의 경우 Mio 가 대표적인 Cross platform library 입니다. 다만 Mio를 그대로 가져다가 쓰기에는 저희의 코드가 조금 복잡해질수 있어서 이 글에서는 https://github.com/cfsamson/examples-minimio를 통해서 맛보기 수준의 Runtime 을 만들어보려고 합니다!

갑자기 Runtime 이란 용어가 나와서 당황스러우시다고요? 걱정하지 마세요. 사실 Scheduler를 정의하기에 따라 단순히 Task들을 분배하는 로직! 이라고 작게 정의할수 있고 저희처럼 OS랑 통신을 하면서 epoll 로 Event 를 등록하고 그 결과를 받아와서 Task들을 분배하는것이라고 크게 정의할수도 있습니다. 지금까지 1,2편에서는 저희는 Scheduler의 정의로 후자를 사용했는데 후자의 경우 Runtime 이라고 부르기도 합니다. 즉, 이번 편에선 만들 Runtime은 저희가 지금까지 배웠던 것들을 한번에 실제 코드로 만들어보는 과정입니다!

그리고 코드에서 보실수 있듯이 이 Runtime 는 async/await syntax가 아닌 callback 형식의 Runtime입니다. 이는 async/await 를 알기 위해서는 Rust의 Futures 에 대해 알아야 하는데 Futures 는 이 자체로 시리즈를 통해 설명이 필요한 주제여서 이번 시리즈에서는 다루지 않을 예정입니다~ 나중에 기회가 되면 Rust의 Futures 에 관해서도 글을 써보도록 하겠습니다!

Helper Structs

먼저 Runtime 을 실제 구현하기에 앞서 Runtime 안에서 사용될 struct 들을 먼저 간단히 정리하고 넘어가겠습니다.

/// 이 예제에서 mock 하고자 하는것이 JS의 Node Runtime 이어서 task 들은 가상의 Js function을 표현하기 위해서 Fn() -> Js 표현이 사용되었습니다. 
struct Task {
    task: Box<dyn Fn() -> Js + Send + 'static>,
    callback_id: usize,
    kind: ThreadPoolTaskKind,
}

/// 여기서 close 는 empty task 를 만들어서 각 thread에게 종료 시그널을 보내는 용도로 사용됩니다. 
impl Task {
    fn close() -> Self {
        Task {
            task: Box::new(|| Js::Undefined),
            callback_id: 0,
            kind: ThreadPoolTaskKind::Close,
        }
    }
}

/// 위에서 설명했듯이 저희의 Runtime 은 딱 세 가지로 작업만을 지원합니다. 
pub enum ThreadPoolTaskKind {
    FileRead,
    Encrypt,
    Close,
}

/// 각 native thread에 대한 sender를 들고 있는 struct입니다. 
#[derive(Debug)]
struct NodeThread {
    pub(crate) handle: JoinHandle<()>,
    sender: Sender<Task>,
}


/// epoll event loop이 처리하는 세 가지 작업을 표현하는 enum 입니다. 
enum PollEvent {
    /// An event from the `threadpool` with a tuple containing the `thread id`,
    /// the `callback_id` and the data which the we expect to process in our
    /// callback
    Threadpool((usize, usize, Js)),
    /// An event from the epoll-based eventloop holding the `event_id` for the
    /// event
    Epoll(usize),
    Timeout,
}

Runtime::new() 구현하기

먼저 저희의 Runtime에 대략적인 구조는 아레와 같습니다. 먼저 N개의 OS Native Thread 를 만든 다음, 각각의 스레드에 channel 을 만들어서 스레드는 channel의 receiver를 소유하고 있어서 이 receiver로 Close 라는 메세지가 오기 전까지 들어오는 event(task) 들을 순차적으로 처리합니다. 그리고 만약 callback 이 있다면 다시 event 형태로 만들어서 같은 스레드로 보내게 됩니다(이 부분은 실제 tokio를 비롯한 스케쥴러와는 다른데 실제 스케쥴러에서는 callback event가 해당 스레도에서 처리를 할수도 그렇지 않을수도 있습니다! 이 부분은 2편에서 다루었던 Fair Scheduling과 관련된 부분입니다). Runtime에는 native threads외에 epoll thread가 있어서 이 스레드가 minimio와 통신을 하면서 OS event queue 에 작업을 맡기고 그 결과를 받아와서 처리가 필요하면 OS native thread로 보내게 됩니다.

그러면 먼저 Runtime struct를 같이 보시겠습니다.

pub struct Runtime {
    /// Available threads for the threadpool
    available_threads: Vec<usize>,
    /// Callbacks scheduled to run
    callbacks_to_run: Vec<(usize, Js)>,
    /// All registered callbacks
    callback_queue: HashMap<usize, Box<dyn FnOnce(Js)>>,
    /// Number of pending epoll events, only used by us to print for this example
    epoll_pending_events: usize,
    /// Our event registrator which registers interest in events with the OS
    epoll_registrator: minimio::Registrator,
    // The handle to our epoll thread
    epoll_thread: thread::JoinHandle<()>,
    /// Channel used by both our threadpool and our epoll thread to send events
    /// to the main loop
    event_receiver: Receiver<PollEvent>,
    /// Creates an unique identity for our callbacks
    identity_token: usize,
    /// The number of events pending. When this is zero, we're done
    pending_events: usize,
    /// Handles to our threads in the threadpool
    thread_pool: Vec<NodeThread>,
}

대부분의 field들은 지금까지 설명드린 내용으로 어느정도 이해가 되실겁니다.

다음은 Runtime를 만드는 과정입니다.

impl Runtime {
    pub fn new() -> Self {
        // ===== THE REGULAR THREADPOOL =====
        let (event_sender, event_receiver) = channel::<PollEvent>();
        let mut threads = Vec::with_capacity(THREAD_POOL_SIZE);

        //여기서 thread를 생성합니다. 
        for i in 0..THREAD_POOL_SIZE {
            let (evt_sender, evt_reciever) = channel::<Task>();
            let event_sender = event_sender.clone();

            let handle = thread::Builder::new()
                .name(format!("pool{}", i))
                .spawn(move || {

                    while let Ok(task) = evt_reciever.recv() {
                        print(format!("received a task of type: {}", task.kind));

                        if let ThreadPoolTaskKind::Close = task.kind {
                            break;
                        };

                        let res = (task.task)();
                        print(format!("finished running a task of type: {}.", task.kind));

                        let event = PollEvent::Threadpool((i, task.callback_id, res));
                        event_sender.send(event).expect("threadpool");
                    }
                })
                .expect("Couldn't initialize thread pool.");

            let node_thread = NodeThread {
                handle,
                sender: evt_sender,
            };

            threads.push(node_thread);
        }

        // ===== EPOLL THREAD =====
        let mut poll = minimio::Poll::new().expect("Error creating epoll queue");
        let registrator = poll.registrator();

        let epoll_thread = thread::Builder::new()
            .name("epoll".to_string())
            .spawn(move || {
                let mut events = minimio::Events::with_capacity(1024);

                loop {
					// 여기서 mio 의 poll api 를 이용해서 준비된 event 가 있는지 확인을 합니다. 
                    // poll은 준비된 event 의 index 를 리턴하는데 이를 통해 준비된 event 들을 확인할수 있습니다. 
                    match poll.poll(&mut events, None) {
                        Ok(v) if v > 0 => {
                            for i in 0..v {
                                let event = events.get_mut(i).expect("No events in event list.");
                                print(format!("epoll event {} is ready", event.id()));

                                let event = PollEvent::Epoll(event.id());
                                event_sender.send(event).expect("epoll event");
                            }
                        }
                        Ok(v) if v == 0 => {
                            print("epoll event timeout is ready");
                            event_sender.send(PollEvent::Timeout).expect("epoll timeout");
                        }
                        Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
                            print("received event of type: Close");
                            break;
                        }
                        Err(e) => panic!("{:?}", e),
                        _ => unreachable!(),
                    }
                }
            })
            .expect("Error creating epoll thread");

        Runtime {
            available_threads: (0..4).collect(),
            callbacks_to_run: vec![],
            callback_queue: HashMap::new(),
            epoll_pending_events: 0,
            epoll_registrator: registrator,
            epoll_thread,
            event_receiver,
            identity_token: 0,
            pending_events: 0,
            thread_pool: threads,
        }
    }
}

이 코드에서는 타임아웃 부분을 생략했는데 만약 timeout 이 있다면 poll() 에 None이 아닌 Some(i32)로 타임아웃 시간을 넣어주시면 됩니다.

Runtime::run() 구현하기

impl Runtime {
 	pub fn run(mut self, f: impl Fn()) {
    	//여기서 runtime 을 이렇게 구현한 이유는 runtime에 대한 static pointer를 유지해서 mock js function 들을 호출할때 직접 event 를 등록할수 		  //하기 위함입니다. 
        let rt_ptr: *mut Runtime = &mut self;
        unsafe { RUNTIME = rt_ptr };

        // just for us priting out during execution
        let mut ticks = 0;

        // First we run our "main" function
        f();

        // ===== EVENT LOOP =====
        while self.pending_events > 0 {
            ticks += 1;
            // NOT PART OF LOOP, JUST FOR US TO SEE WHAT TICK IS EXCECUTING
            print(format!("===== TICK {} =====", ticks));
            // ===== 1. CALLBACKS =====
            // Timer callbacks and if for some reason we have postponed callbacks
            // to run on the next tick. Not possible in our implementation though.
            self.run_callbacks();

            // ===== 2. POLL =====
            // First we need to check if we have any outstanding events at all
            // and if not we're finished. If not we will wait forever.
            if self.pending_events == 0 {
                break;
            }

            // We handle one and one event but multiple events could be returned
            // on the same poll. We won't cover that here though but there are
            // several ways of handling this.
            if let Ok(event) = self.event_receiver.recv() {
                match event {
                    PollEvent::Timeout => (),
                    PollEvent::Threadpool((thread_id, callback_id, data)) => {
                        self.process_threadpool_events(thread_id, callback_id, data);
                    }
                    PollEvent::Epoll(event_id) => {
                        self.process_epoll_events(event_id);
                    }
                }
            }
            self.run_callbacks();

            // ===== 3. CHECK =====
            // an set immediate function could be added pretty easily but we
            // won't do that here

            // ===== 4. CLOSE CALLBACKS ======
            // Release resources, we won't do that here, but this is typically
            // where sockets etc are closed.
        }

        // We clean up our resources, makes sure all destructors run.
        for thread in self.thread_pool.into_iter() {
            thread.sender.send(Task::close()).expect("threadpool cleanup");
            thread.handle.join().unwrap();
        }

        self.epoll_registrator.close_loop().unwrap();
        self.epoll_thread.join().unwrap();

        print("FINISHED");
    }
}

마무리하기

오늘 제가 다룬 간단한 Runtime 은 원본 코드에서 타임아웃 함수들을 제거해서 조금 더 간단하게 만든 버젼입니다. 만약 풀 버젼 및 자세할 설명을 보시고 싶으시다면 아래 링크를 따라가시면 됩니다.

위 예시를 통해 저희는 실제로 Runtime을 구현해봄으로써 Event loop 이 어떻게 구성되고 비동기 런타임이 어떻게 작업들을 스케쥴링하고 처리하는지에 대해 알수 있었습니다. 물론 단순히 글을 읽기보다는 실제로 코드로 구현해보고 다양한 테스트를 해보시면 더욱 깊게 이해하실수 있기 때문에 저는 아래 링크에 있는 원본 글을 정독하시는걸 추천드립니다!

마치면서

이번 편에서는 실제 코드를 작성하면서 Runtime를 구현해 해보았습니다. 다음 편에서는 Rust의 비동기 프레임워크의 두 축 중 하나인 Tokio에 대해 조금 더 깊게 알아보겠습니다~ 제가 부족한 부분이 많아 설명이 잘못되었거나 모호한 부분이 있을수 있습니다. 댓글로 편하게 지적해주시면 바로 수정하겠습니다~ 감사합니다~

이 글은 제가 근무하고 있는 크래프트테크놀로지스의 지원을 받아 작성되었습니다. 저희 회사에서는 여러 제품들에서 rust를 사용하고 있으며 Rust를 사용해서 더 멋있는 제품을 만드려고 노력중입니다. 같이 빠르고 안전한 금융 서비스분들을 아래 링크를 참조해주세요!

https://qraft.oopy.io/#utm_source=tech_blog&utm_medium=link&utm_campaign=recruit&utm_term=louis_lee&utm_content=async_rust

Reference

https://cfsamson.github.io/book-exploring-async-basics/introduction.html

profile
개발이 어려운 개발자

0개의 댓글