이제 tokio의 동시성을 조금 배웠으니 이걸 client 측에 적용해보자.
이전에 작성한 서버 코드를 명시적 바이너리 파일에 넣는다.
mkdir src/bin
mv src/main.rs src/bin/server.rs
클라이언트 코드를 포함할 새 바이너리 파일을 만든다.
touch src/bin/client.rs
실행하고 싶을 때마다 먼저 별도의 터미널 창에서 서버를 시작해야한다.
cargo run --bin server
그런 다음 클라이언트는 별도의 터미널에서 다음을 수행한다.
cargo run --bin client
이제 두개의 동시 Redis 명령어을 실행하고 싶다고 가정하자. 명령어당 하나의 작업을 생성할 수 있으므로, 두개의 명령어가 동시에 발생한다.
먼저 다음과 같이 시도한다.
use mini_redis::client;
#[tokio::main]
async fn main() {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async {
let res = client.get("foo").await;
});
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
위의 코드는 여러 비동기 작업이 동일한 Client 자원에 접근해야 하는 상황에서 컴파일 에러가 발생한다. 이는 Client가 Copy 트레이트를 구현하지 않아 자동 복사가 되지 않아, 여러 작업이 Client를 공유하거나 접근하기 어렵게 만든다. 게다가 Client::set 메서드가 &mut self를 인자로 가지고 있어서, 이 메서드를 호출하기 위해서는 Client에 대한 독점(배타적인) 접근 권한이 필요하다. 일반적으로는 std::sync::Mutex를 사용하여 여러 작업이 공유 자원에 안전하게 접근할 수 있지만, .await를 호출하는 동안에는 뮤텍스를 보유한 상태로 대기할 수 없기 때문에 사용하기 어렵다. tokio::sync::Mutex를 사용하면 비동기 작업이 공유 자원에 접근하는 것을 조정할 수 있지만, 해당 방법은 한 번에 하나의 요청만 처리하게 되므로 만약 클라이언트가 파이프라이닝(pipelining)을 지원한다면, 비동기 뮤텍스의 사용이 비효율적일 것이다...
답은 메세지 패싱에 있다. 이 패턴에는 client 자원을 관리하기 위한 전용 작업 생성이 포함된다. 요청을 발행하려는 모든 작업은 해당 client 작업에 메시지를 보낸다. client 작업은 sender를 대신해 요청을 발행하고, 응답은 sender 에게 다시 전송된다.
이 전략을 사용하면 단일 연결이 생성된다. client를 관리하는 작업은 get과 set을 호출하기 위한 독점 접근 권한을 얻을 수 있고, 채널은 버퍼로 작동한다. client작업이 진행되는 동안 client작업으로 Operations가 전송될 수 있다. client작업이 새로운 요청을 처리할 수 있게 되면, 채널로 부터 다음 요청을 가져온다. 이로 인해 처리량이 향상되고 연결 풀링이 가능해진다.
tokio는 여러 다른 목적의 채널들을 제공한다.
단일 소비자가 각각의 메세지를 볼 수 있는 multi-producer multi-consumer채널이 필요한 경우, async-channel크레이트를 쓸 수 있다. 또한 동기적인 rust코드에서 사용되는 std::sync::mpsc, crossbeam::channel 들도 있다. 이 채널들은 스레드를 블락하면서 메세지를 기다리는데, 비동기 코드에서는 사용할 수 없다.
메세지 패싱을 사용하는 대부분의 경우에, 메세지를 받는 작업은 둘 이상의 명령에 응답한다. GET과 SET 명령에 응답하게 해보자. 먼저 Command 열거형을 만들고 각 명령 타입 변수를 포함시킨다.
use bytes::Bytes;
#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}
main함수에서 mpsc채널을 생성한다.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Create a new channel with a capacity of at most 32.
let (tx, mut rx) = mpsc::channel(32);
// ... Rest comes here
}
mpsc채널은 redis 연결을 관리하는 작업에 명령을 전송하기 위해 사용한다. 다중 생산자 기능은 여러 작업들로 부터 메세지를 보낼 수 있게 한다. 채널을 생성하면 두개의 값을 리턴하는데, 송신자(sender)와 수신자(receiver)이다. 이 두 핸들은 분리되어 있고, 다른 작업으로 이동될 수 있다. 위의 채널은 32개의 용량으로 생성됐다. 만약 메세지가 그들이 받는 것보다 빠르게 보내진다면 채널은 그것들을 저장할 것이다. 채널에 32개의 메세지가 저장되면 send(...).await를 호출하고, 메세지가 수신자에 의해 제거될 때까지 sleep한다.
여러 작업으로부터 메세지를 전송하려면 Sender를 clone한다.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await;
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
두 메세지가 단일 수신자에게 전송됐다. mpsc채널에서 수신자는 복제할 수 없다.
모든 송신자가 스코프를 벗어나거나 드랍된다면, 채널에 더이상의 메세지를 보낼 수 없다. 이 시점에서 수신자가 recv를 호출하면 None을 반환하는데, 모든 송신자가 끝났고 채널이 닫혔다는 의미다.
다음으로, 채널의 메세지를 처리하는 작업을 생성한다. 먼저 Redis에 대한 클라이언트 연결이 설정된다. 수신된 명령은 Redis 연결을 통해 실행됩니다.
use mini_redis::client;
// The `move` keyword is used to **move** ownership of `rx` into the task.
let manager = tokio::spawn(async move {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Start receiving messages
while let Some(cmd) = rx.recv().await {
use Command::*;
match cmd {
Get { key } => {
client.get(&key).await;
}
Set { key, val } => {
client.set(&key, val).await;
}
}
}
});
이제 Redis 연결에서 직접 명령을 실행하는 대신 채널을 통해 명령을 보내도록 두 작업을 업데이트한다.
// The `Sender` handles are moved into the tasks. As there are two
// tasks, we need a second `Sender`.
let tx2 = tx.clone();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "foo".to_string(),
};
tx.send(cmd).await.unwrap();
});
let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};
tx2.send(cmd).await.unwrap();
});
main함수 하단에서는 조인핸들을 .await하여 프로세스가 종료되기 전에 명령이 완전히 완료되도록 한다.
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
마지막 단계는 관리자 작업으로부터 응답을 다시 받는 것이다. GET 명령은 값을 얻어야하고, SET 명령은 작업이 완전히 성공했는지 알아야한다.
응답을 건네주기 위해, oneshot채널을 사용한다. oneshot채널은 spsc 채널로 단일 값을 보내는데 최적화되어 있다.
mpsc와 유사하게 oneshot::channel()은 단일 송신자와 수신자 핸들을 반환한다.
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
mpsc와 달리 용량이 언제나 1이고, 핸들 복제도 안된다. 관리자 작업으로 부터 응답을 받기 위해, 명령을 보내기 전에 oneshot채널이 생성된다. 채널의 절반인 Sender는 관리자 작업에 대한 명령에 포함된다. 수신 절반은 응답을 수신하는 데 사용된다. 일단 Command를 업데이트하자.
use tokio::sync::oneshot;
use bytes::Bytes;
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}
/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
이제 oneshot::Sender가 포함된, 명령을 발행하는 작업을 업데이트한다.
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "foo".to_string(),
resp: resp_tx,
};
// Send the GET request
tx.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};
// Send the SET request
tx2.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
마지막으로, oneshot채널을 통해 응답을 보내도록 관리자 작업을 업데이트한다.
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Ignore errors
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val).await;
// Ignore errors
let _ = resp.send(res);
}
}
}
oneshot::Sender에서 send호출은 즉시 완료되고 .await가 필요없다. 왜냐하면 oneshot채널에서의 send는 어떠한 형태의 기다림없이 항상 즉각 성공하거나 실패할 것이기 때문이다.
oneshot 채널에서 값을 전송할 때, 수신자가 이미 드롭(Dropped)되어 더 이상 응답을 받을 준비가 되어있지 않은 상태라면, resp.send(...) 함수는 Err을 반환한다. 수신자가 결과에 더 이상 관심을 가지지 않는 경우가 발생할 수 있는데, 수신자가 이미 종료되었거나 더 이상 해당 결과를 기다리지 않는다면, Err을 처리할 필요가 없다.
use bytes::Bytes;
use mini_redis::client;
use tokio::sync::{mpsc, oneshot};
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}
/// Provided by the requester and used by the manager task to send the command
/// response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
// Clone a `tx` handle for the second f
let tx2 = tx.clone();
let manager = tokio::spawn(async move {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Ignore errors
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val).await;
// Ignore errors
let _ = resp.send(res);
}
}
}
});
// Spawn two tasks, one setting a value and other querying for key that was
// set.
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "foo".to_string(),
resp: resp_tx,
};
// Send the GET request
if tx.send(cmd).await.is_err() {
eprintln!("connection task shutdown");
return;
}
// Await the response
let res = resp_rx.await;
println!("GOT (Get) = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};
// Send the SET request
if tx2.send(cmd).await.is_err() {
eprintln!("connection task shutdown");
return;
}
// Await the response
let res = resp_rx.await;
println!("GOT (Set) = {:?}", res);
});
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
}