tokio에서의 shared state

손호준·2023년 12월 22일

https://tokio.rs/tokio/tutorial/shared-state

tokio에서 shared state 하는 방법

  1. shared state를 Mutex로 보호(guard)한다.
  2. 상태(state) 관리를 위한 작업(task)을 생성(spawn)하고,이걸 실행하기 위해 메세지 패싱을 사용한다.

일반적으로 간단한 데이터에는 1번 방식을 사용하고, I/O 기본 요소와 같은 비동기 작업이 필요한 작업에는 2번 방식을 사용한다.

지금부터 알아 볼 예시에서 공유 상태는 HashMap이고 수행할 작업은 insertget이다. 두 작업 모두 비동기 작업이므로 Mutex를 사용한다.

1. toml 파일의 [dependencies] 항목에 다음 사항을 추가한다.

bytes = "1"

bytes 크레이트는 네트워크 프로그래밍에서 강력한 바이트 배열 구조를 제공한다. bytes 크레이트가 Vec<u8>와 다른 가장 큰 특징은 얕은 복사(shallow cloning) 인데, Bytes 인스턴스에 clone()을 호출해도 내부의 데이터가 복사되지 않는다. 대신 Bytes 인스턴스는 일부 데이터에 대한 참조 카운트 핸들(reference-counted handle)이다. Byte 타입은 대략 Arc<Vec<u8>>에 몇가지 기능을 추가한 것과 비슷하다고 보면 된다.

2. HashMap 초기화

HashMap이 많은 작업과 많은 스레드에서 공유될 것이므로, 이걸 지원하기 위해 Arc<Mutex<_>>로 감싼다.
다음과 같이 타입 별칭을 추가한다.

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

이제 main 함수를 업데이트 할건데, HashMap을 초기화 하고 Arc핸들을 process함수에 넘겨준다.
Arc를 사용하면 HashMap이 여러 작업에서 동시에 참조되고, 잠재적으로 여러 스레드에서 실행될 수 있게 해준다. tokio에서 핸들 이라는 용어는 일부 공유 상태에 대해 접근하는 값을 참조하는 데 사용된다.

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type Db = Arc<Mutex<HashMap<String, Bytes>>>;


#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    println!("Listening");

    let db = Arc::new(Mutex::new(HashMap::new()));

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let db = db.clone();

        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }

std::sync::Mutex vs tokio::sync::Mutex

  • std::sync::Mutex는 표준 라이브러리인 std의 모듈에 있는 동기화된 뮤텍스(Mutex)이며, 일반적인 동기화 방법으로 사용된다. 스레드가 락을 얻기 위해 대기할 때, 해당 스레드는 블록되므로 다른 작업이 처리되지 못하도록 차단될 수 있다. 저렴한 동시성이 요구되며, .await 호출 사이에서 락이 유지될 필요가 없는 경우에 사용한다.
  • tokio::sync::Mutex는 Tokio 라이브러리의 모듈에 있는 뮤텍스로, 비동기 코드에서 .await 호출 사이에서 락을 유지하는 뮤텍스다. 내부적으로는 동기 뮤텍스를 사용하므로 비동기 코드에서 락을 유지하는 동안 블록될 수 있다. 때문에 비동기 코드라고 무조건 적으로 tokio::sync::Mutex를 사용하는 것은 도움이 되지 않을 수 있고, 오히려 std::sync::Mutex를 사용하는 것이 더 효율적일 수 있다.

3. process()함수를 업데이트

HashMap(db)을 사용하기 전에 lock을 얻도록 수정한다. 또한 HashMap이 이젠 Bytes타입 이므로 저렴하게 clone할 수 있다.

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    // Connection, provided by `mini-redis`, handles parsing frames from
    // the socket
    let mut connection = Connection::new(socket);

    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }           
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

작업, 스레드, 경합

짧은 critical sections을 보호하기 위해 블로킹 뮤텍스를 사용하는 것은 경합이 최소화 될 때 허용되는 전략이다. 락의 경합이 발생하면, 작업을 실행하는 스레드는 반드시 뮤텍스를 블락하고 기다려야한다. 이것은 현재 작업을 블락할 뿐 아니라, 현재 스레드에 스케쥴된 다른 모든 작업들을 블락할 것이다.

기본적으로 tokio 런타임은 멀티스레드 스케쥴러를 사용한다. 작업은 런타임에서 관리하는 스레드 수에 관계없이 예약된다. 만약 실행되기 위한 많은 수의 작업이 스케쥴되었고 그 작업들이 모두 뮤텍스에 대한 접근을 필요로 할 때, 경합이 발생한다. 반면에 current_thread런타임을 사용하면 뮤텍스가 경합되지 않는다. current_thread런타임은 가벼운 싱글스레드 런타임으로, 적은 작업을 생성하고 소수의 소켓을 열 때 좋은 선택이다. 예를 들어 이 옵션은 비동기 클라이언트 라이브러리 위에 동기 API 브리지를 제공할 때 잘 작동한다.

만약 동기식 뮤텍스의 경합이 문제라면, 가장 좋은 해결 방법은 tokio 뮤텍스로 전환하는 것이다. 대신 다음의 사항들을 고려해야 한다.

  • 상태를 관리하는 전용 작업으로 변경하고 메세지 패싱을 사용한다.
  • 뮤텍스를 샤딩(shard)한다.
  • 뮤텍스를 피하기 위해 코드를 재구조화 한다.

key가 독립적이라면, 뮤텍스 샤딩이 적절할 것이다. 단일 Mutex<HashMap<_, _>> 인스턴스를 갖는 대신, 별도의 N 인스턴스를 도입한다.

type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

fn new_sharded_db(num_shards: usize) -> ShardedDb {
    let mut db = Vec::with_capacity(num_shards);
    for _ in 0..num_shards {
        db.push(Mutex::new(HashMap::new()));
    }
    Arc::new(db)
}

주어진 어떠한 key로 cell을 찾는 과정은 두 단계이다. 먼저 key는 어느 샤드에 속해 있는지 식별하는데 사용되고, 해당 키가 HashMap에서 찾는다.

let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

위에 설명된 간단한 구현에서는 고정된 개수의 샤드를 사용해야 하며, 샤드 맵이 생성되면 샤드 수를 변경할 수 없다. dashmap 크레이트 는 보다 정교한 분할된 해시 맵 구현을 제공한다.

샤딩(sharding)이란?

샤딩은 대규모 db나 분산 시스템에서 데이터를 관리하기 위한 방법중 하나다. db나 시스템이 커지면서 단일 서버로 처리하기 어려운 양의 데이터를 처리하기 위해 사용된다. 주로 데이터를 여러 파티션(혹은 샤드)로 나누어 각각의 파티션에 데이터를 분산시키는 기술이다. 각 샤드는 독립적으로 동작하며, db의 부하를 분산시키고 처리량을 증가시킨다. 각 샤드는 db의 일부를 담당하며 전체 db의 일관성을 유지하기 위해 중앙 집중식이 아닌 분산 처리를 통해 동작한다. 샤딩을 사용하면 수평 확장이 가능해지고, 더 많은 데이터를 저장하고 처리할 수 있게 된다.

.await 실행에서 MutexGuard 유지하기

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

이 코드는 다음 에러를 발생시킨다.

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

이 에러는 std::sync::MutexGuard타입이 Send트레잇을 구현하지 않았는데, tokio 런타임이 모든 .await의 실행마다 작업을 스레드끼리 이동시켜 버릴 수 있기 때문에 발생한다. 따라서 뮤텍스락을 다른 스레드로 안전하게 보낼 수 없다고 알려주고 있다. 이것을 피하기 위해 .await이 실행되기 전에 뮤텍스 락의 소멸자를 구현해줘야하 한다.

// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

중괄호로 scope을 만들어 이걸 벗어나면 뮤텍스 락이 drop되도록 해주었다.
하지만 아래의 코드는 실행되지 않는다.

use std::sync::{Mutex, MutexGuard};

// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);

    do_something_async().await;
}

현재 컴파일러는 퓨쳐의 Send 구현 여부를 범위 정보에 기반해서만 계산하기 때문이다. 명시적으로 drop 하는건 안된다고 생각해두자.

Tokio 런타임에서 뮤텍스를 사용하는 경우, 현재 작업이 뮤텍스의 락을 얻은 상황에서 .await 호출하여 작업이 중단 되면, 동일한 스레드에서 다른 작업이 락을 얻으려는 시도를 하면서 데드락이 발생할 수 있으므로 조심해야한다. 이 문제를 해결하기 위해 다음 몇 가지 방법이 있다.

1. .await을 사용할 때 락을 얻지 않도록 코드를 수정한다.

뮤텍스를 구조체로 래핑하고, 뮤텍스 락을 non-async 메소드 안에서만 사용한다.

use std::sync::Mutex;

struct CanIncrement {
   mutex: Mutex<i32>,
}
impl CanIncrement {
   // This function is not marked async.
   fn increment(&self) {
       let mut lock = self.mutex.lock().unwrap();
       *lock += 1;
   }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
   can_incr.increment();
   do_something_async().await;
}

이렇게 하면 뮤텍스 보호가 비동기 함수 안에 나타나지 않기 때문에 Send에러를 막을 수 있다.

2. 상태를 관리할 작업을 생성하고, 해당 작업을 실행하기 위해 메세지 패싱을 사용한다.

자세한건 다음 장에서 살펴보자.

3. tokio의 비동기 뮤텍스를 사용한다.

tokio 뮤텍스의 주된 특징은 .await를 실행할 때, 문제 없이 뮤텍스를 들고 있을 수 있다는 것이다. 하지만 비동기 뮤텍스는 비용이 많이 드니까 다른 방법을 사용하는게 낫다.

use tokio::sync::Mutex; // note! This uses the Tokio mutex

// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here
profile
Rustacean🦀

0개의 댓글