동시성

use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        println!("new thread");
    });

    handle.join().unwrap();
}
  • 러스트는 위 처럼 std::thread 모듈로 스레드를 생성하고 제어할 수 있다
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        panic!("panic occured on thread")
    });

    match handle.join() {
        Ok(_) => {},
        Err(e) => {
            println!("Error occured inside of thread"); // 출력
        }
    }
    println!("process finised"); // 출력
}
  • thread의 join함수는 Result<T,E>를 반환하므로, 스레드 내부에서 발생한 panic은 복구 가능한 오류가 된다

스레드간 자료 주고받기

use std::{sync::mpsc, thread};

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        tx.send(100).unwrap();
    });

    let data = rx.recv().unwrap();

    println!("{}", data);
}
  • mpsc::channel 함수로 채널을 만들 수 있다
    • multi producer, single consumer의 약자이다
  • move 클로저는 스레드가 tx 변수를 캡처해 소유권을 해당 스레드가 가지도록 하는 키워드이다

async/await

future

[package]
name = "myrust"
version = "0.1.0"
edition = "2024"

[dependencies]
futures = "0.3"
  • async/await을 사용하려면 future crate를 의존성에 추가해야 한다
use futures::executor::block_on;

async fn hello_world() {
    println!("hello world");
}

fn main() {
    let future = hello_world();
    println!("called on main");

    block_on(future);
    println!("future finished");    
}
  • 비동기로 동작하는 함수 앞에 async를 붙혀 비동기 함수로 만들 수 있다
  • async 함수는 future로 만들어지고, block_on으로 실행할 수 있다

async/await

use futures::executor::block_on;

async fn calc_sum(start: i32, end: i32) -> i32 {
    let mut sum = 0;
    for i in start..=end {
        sum += i;
    }
    sum
}

async fn calc() -> i32 {
    let mut sum = 0;
    sum += calc_sum(1, 50).await;
    sum += calc_sum(51, 100).await;
    sum
}

fn main() {
    let fut = calc();
    let sum = block_on(fut);
    println!("{}", sum);
}
  • async 함수 안에서 await 키워드를 사용하면 다른 async 함수를 호출할 수 있다
  • await 키워드를 사용하면
    • 현재 작동중인 작업이 일시 중단되고
    • 해당 스레드의 이벤트 루프에 제어권이 반환된다
    • 이벤트 루프는 future를 실행하고
    • 실행 결과를 await을 사용한 async 함수에 전달한다

tokio

use std::{thread, time::Duration};

use futures::executor::block_on;

async fn sleep_10s() {
    for _ in 1..10 {
    	print!(".");
        thread::sleep(Duration::from_secs(1));
    }
}

async fn print_loop() {
    for i in 1..10 {
        print!("{} ", i);
    }
}

async fn wrapper() {
    let f1 = sleep_10s();
    let f2 = print_loop();
    futures::join!(f1, f2);
}

fn main() {
    let future = wrapper();
    block_on(future); // => .........1 2 3 4 5 6 7 8 9 출력
}
  • join!으로 두개의 future가 같은 이벤트 루프를 공유하는 상황이다
  • sleep_10s 함수의 sleep이, 다른 future인 print_loop도 막아버려 동시성을 가지지 못하게 되었다
use std::{time::Duration};
use tokio::time;

async fn sleep_10s() {
    for _ in 1..10 {
        print!(".");
        time::sleep(Duration::from_secs(1)).await;  // tokio 적용 1
    }
}

async fn print_loop() {
    for i in 1..10 {
        print!("{} ", i);
    }
}

async fn wrapper() {
    let f1 = sleep_10s();
    let f2 = print_loop();
    tokio::join!(f1, f2); // tokio 적용 2
}

#[tokio::main]
async fn main() {
    wrapper().await; // => .1 2 3 4 5 6 7 8 9 ........ 출력
}
  • tokio는 async, await을 쉽게 만드는 다양한 기능을 제공한다
  • std::thread의 sleep 대신 tokio의 sleep을 사용하여 동시성을 가지게 할 수 있다

이벤트 루프

  • 이벤트 루프와 스레드는 아래와 같은 장단점이 있다
이벤트루프스레드
특징단일 스레드를 사용
이벤트큐에 수신된 이벤트 처리를 무한 반복
스레드마다 별도의 작업이 할당됨
장점최소한의 오버헤드로 많은 이벤트 처리 가능
복잡한 동기화 매커니즘 필요 없음
cpu 코어가 충분한 경우 복수의 작업 병렬화 가능
단점단일 스레드이므로, cpu 연산이 많이 필요한 작업에 부적합복잡한 동기화 매커니즘이 필요함
cpu 개수보다 많은 동시 처리를 수행하는 경우 컨텍스트 스위칭으로 인한 오버헤드 발생

동시성 제어

  • 러스트는 동시성 제어를 위해 뮤텍스와 세마포어를 제공한다

뮤텍스

use std::{sync::Mutex, thread};

static counter: Mutex<i32> = Mutex::new(0);

fn inc_counter() {
    let mut num = counter.lock().unwrap();
    *num += 1;
}

fn main() {
    let mut thread_vec = vec![];

    for _ in 1..100 {
        thread_vec.push(thread::spawn(inc_counter));
    }

    for th in thread_vec {
        th.join().unwrap();
    }

    println!("{}", *counter.lock().unwrap());
}
  • 100개의 스레드가 counter 변수에 동시에 접근하는 예시
  • counter 변수를 mutex로 감싸서 만들 수 있다

세마포어

use std::{sync::{Arc, Mutex}, thread, time::Duration};
use tokio::sync::Semaphore;

static counter: Mutex<i32> = Mutex::new(0);

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(2));
    let mut future_vec = vec![];

    for i in 1..100 {
        let semaphore = semaphore.clone();
        let future = tokio::spawn(async move {
            let permit = semaphore.acquire_owned().await.unwrap();
            println!("{} enter", i);
            tokio::time::sleep(Duration::from_millis(100)).await;
            let mut num = counter.lock().unwrap();
            *num += 1;
            drop(permit);
        });
        future_vec.push(future);
    }

    for th in future_vec {
        th.await.unwrap();
    }

    println!("{}", *counter.lock().unwrap());
}
  • 세마포어는 critical section을 직접 지정해야 한다
    • 여기서는 permit 변수로 획득 시점과 해제 시점으로 설정하였다
  • Arc<T>는 원자적 참조 카운트를 의미한다
    • 참조 카운트 계산이 원자적이지 못하면 자원 경쟁 문제가 생길 수 있다

데드락

use std::{thread, sync::{Arc, Mutex}};

fn main() {
    let lock_a = Arc::new(Mutex::new(0));
    let lock_b = Arc::new(Mutex::new(0));

    let lock_a_ref = lock_a.clone();
    let lock_b_ref = lock_b.clone();

    let thread1 = thread::spawn(move || { // 강제로 교착상태 만듬
        let a = lock_a.lock().unwrap();
        let b = lock_b_ref.lock().unwrap();
    });

    let thread2 = thread::spawn(move || {
        let a = lock_a_ref.lock().unwrap();
        let b = lock_b.lock().unwrap();
    });

    thread1.join().unwrap();
    thread2.join().unwrap();
}
  • 두 개 이상의 스레드가 각각 자원 해제를 기다리느라 시스템이 멈춘 상황

0개의 댓글