16장. 동시성

Gillilab - TechLog·2024년 11월 17일

Rust

목록 보기
17/21

16장. 동시성

Rust는 안전하고 효율적인 동시성 프로그래밍을 지원합니다. Rust의 소유권 시스템과 타입 시스템은 데이터 경합과 메모리 안전성을 보장하여 동시성 문제를 방지합니다. Rust에서 동시성을 구현하는 주요 방법에는 스레드, 메시지 패싱, 뮤텍스 등이 있습니다.

스레드

Rust의 표준 라이브러리는 std::thread 모듈을 통해 스레드를 지원합니다. 스레드를 사용하여 여러 작업을 병렬로 실행할 수 있습니다.

스레드 예제

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("Hello from the spawned thread! {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("Hello from the main thread! {}", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

위 예제에서 thread::spawn을 사용하여 새로운 스레드를 생성하고, join을 사용하여 스레드가 종료될 때까지 기다립니다. 메인 스레드와 생성된 스레드는 병렬로 실행됩니다.

메시지 패싱

Rust는 std::sync::mpsc 모듈을 통해 메시지 패싱을 지원합니다. mpsc는 다중 생산자, 단일 소비자(multiple producer, single consumer) 채널을 제공합니다.

메시지 패싱 예제

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("Hello from the spawned thread!");
        tx.send(val).unwrap();
        thread::sleep(Duration::from_millis(1));
    });

    let received = rx.recv().unwrap();
    println!("Received: {}", received);
}

위 예제에서 mpsc::channel을 사용하여 송신자(tx)와 수신자(rx)를 생성합니다. 생성된 스레드는 메시지를 송신하고, 메인 스레드는 메시지를 수신합니다.

뮤텍스

Rust는 std::sync::Mutex를 통해 뮤텍스를 지원합니다. 뮤텍스는 여러 스레드가 공유 데이터를 안전하게 수정할 수 있도록 합니다.

뮤텍스 예제

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

위 예제에서 Arc(원자적 참조 카운팅 스마트 포인터)를 사용하여 Mutex를 여러 스레드가 공유할 수 있게 합니다. 각 스레드는 뮤텍스를 잠그고 공유 데이터를 수정합니다. 모든 스레드가 종료된 후, 최종 결과를 출력합니다.

Summary

  • thread

    • 스레드 생성과 join 처리

      use std::thread;
      use std::time::Duration;
      
      fn main() {
          let handle = thread::spawn(|| {
              for i in 1..10 {
                  println!("스레드에서 숫자 {}", i);
                  thread::sleep(Duration::from_millis(1));
              }
          });
          handle.join().unwrap();
      }
    • move 클로저를 이용한 스레드 간 데이터 이동

      use std::thread;
      
      fn main() {
          let v = vec![1, 2, 3];
      
          let handle = thread::spawn(move || {
              println!("벡터: {:?}", v); // v의 소유권이 클로저로 이동
          });
      
          // println!("{:?}", v); // 오류: v는 이미 이동됨
      
          handle.join().unwrap();
      }
    • 병렬 계산 예제

      use std::thread;
      
      fn main() {
          let numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
          let chunks: Vec<_> = numbers.chunks(3).collect();
          let mut handles = vec![];
      
          // 벡터를 청크로 나누어 병렬 처리
          for chunk in chunks {
              let chunk_vec = chunk.to_vec();
              handles.push(thread::spawn(move || {
                  let sum: i32 = chunk_vec.iter().sum();
                  println!("청크 {:?}의 합: {}", chunk_vec, sum);
                  sum
              }));
          }
      
          // 각 스레드의 결과를 수집
          let total: i32 = handles.into_iter()
              .map(|h| h.join().unwrap())
              .sum();
      
          println!("전체 합: {}", total);
      }
  • 메시지 패싱

    • 채널을 통한 스레드 간 통신

      use std::sync::mpsc;
      use std::thread;
      
      fn main() {
          let (tx, rx) = mpsc::channel();
      
          thread::spawn(move || {
              let val = String::from("안녕하세요");
              tx.send(val).unwrap();
          });
      
          let received = rx.recv().unwrap();
          println!("수신: {}", received);
      }
    • 여러 생산자와 단일 소비자 패턴 구현

      use std::sync::mpsc;
      use std::thread;
      use std::time::Duration;
      
      fn main() {
          let (tx, rx) = mpsc::channel();
      
          // 여러 생산자 생성
          for i in 0..3 {
              let tx_clone = tx.clone();
              thread::spawn(move || {
                  let messages = vec![
                      format!("생산자 {} - 메시지 1", i),
                      format!("생산자 {} - 메시지 2", i),
                      format!("생산자 {} - 메시지 3", i),
                  ];
      
                  for msg in messages {
                      tx_clone.send(msg).unwrap();
                      thread::sleep(Duration::from_millis(100));
                  }
              });
          }
      
          // 원본 tx는 drop하여 채널이 닫힐 수 있게 함
          drop(tx);
      
          // 단일 소비자가 모든 메시지 수신
          while let Ok(received) = rx.recv() {
              println!("수신: {}", received);
          }
      }
  • 공유 상태

    • Mutex를 이용한 안전한 상태 공유

      use std::sync::{Arc, Mutex};
      use std::thread;
      
      fn main() {
          let counter = Arc::new(Mutex::new(0));
          let mut handles = vec![];
      
          for _ in 0..10 {
              let counter = Arc::clone(&counter);
              handles.push(thread::spawn(move || {
                  let mut num = counter.lock().unwrap();
                  *num += 1;
              }));
          }
      
          for handle in handles {
              handle.join().unwrap();
          }
      }
    • 데드락 방지 전략

      • 락 획득 순서 일관성 유지

        let mutex1 = Arc::new(Mutex::new(0));
        let mutex2 = Arc::new(Mutex::new(0));
        
        // 항상 mutex1을 먼저 획득하고 mutex2를 획득하는 순서 유지
        let _guard1 = mutex1.lock().unwrap();
        let _guard2 = mutex2.lock().unwrap();
      • 타임아웃 사용

        use std::time::Duration;
        
        if let Ok(_guard) = mutex.try_lock_for(Duration::from_secs(1)) {
            // 1초 안에 락을 획득하면 작업 수행
        } else {
            // 타임아웃 발생 시 다른 처리
        }
      • 데드락 감지기 활용

        • 순환 의존성 검사
        • 리소스 할당 그래프 모니터링
      • 락 세분화

        • 큰 임계 영역을 작은 단위로 분할
        • 락 보유 시간 최소화
  • Sync와 Send 트레잇

    • Send 트레잇 구현 예제

      #[derive(Debug)]
      struct MyBox<T>(T);
      
      unsafe impl<T: Send> Send for MyBox<T> {}
    • Sync 트레잇을 이용한 스레드 안전성 보장

      // Sync 트레잇 구현 예제
      #[derive(Debug)]
      struct Counter {
          count: Mutex<i32>,
      }
      
      // Mutex<T>가 Sync를 구현하므로 Counter도 Sync
      impl Counter {
          fn new() -> Counter {
              Counter {
                  count: Mutex::new(0),
              }
          }
      }
    • 사용자 정의 타입의 동시성 안전성 구현

      // 사용자 정의 타입에 대한 Sync 구현
      struct SharedData<T> {
          data: T,
      }
      
      unsafe impl<T: Sync> Sync for SharedData<T> {}
      
      // 사용 예시
      let data = Arc::new(SharedData {
          data: vec![1, 2, 3]
      });
      let data_clone = Arc::clone(&data);
      
      thread::spawn(move || {
          println!("다른 스레드에서 데이터 접근: {:?}", data_clone.data);
      });

참조: https://doc.rust-lang.org/book/ch16-00-concurrency.html

0개의 댓글