Rocks DB 스토리지 구현 2)

Tasker_Jang·2024년 9월 8일
0

1. 필요한 라이브러리 및 모듈 임포트

use rocksdb::{DB, Options};
use crate::{StableStorage, Result, Entry};
use std::sync::Arc;
use raft::logger::Logger;
  • rocksdb::{DB, Options}:
    DB: RocksDB 데이터베이스와 상호작용할 수 있는 기능을 제공합니다. 데이터를 저장하거나 조회하는 데 사용됩니다.

    Options: RocksDB의 다양한 동작을 설정하는 구성 옵션을 제공합니다. 예를 들어, 캐시 크기 설정, 압축 여부 등을 지정할 수 있습니다.

  • crate::{StableStorage, Result, Entry}:

    Rust에서 crate는 패키지 또는 라이브러리를 의미합니다. Rust 프로그램에서 여러 모듈로 나눠진 코드를 그룹화한 것이 crate입니다. 하나의 crate는 실행 파일일 수도 있고, 라이브러리일 수도 있습니다.

  • Binary crate: 독립적으로 실행 가능한 프로그램을 나타냅니다.

  • Library crate: 재사용 가능한 라이브러리 코드로 구성됩니다.

    use crate::{...};는 현재 crate에서 필요한 모듈, 구조체, 함수 등을 가져와 사용하기 위한 구문입니다.

  1. StableStorage: StableStorage는 영구적으로 데이터를 저장하고 관리하기 위한 트레이트(trait)로, 주로 분산 시스템에서 로그 엔트리를 안전하게 저장하는 역할을 합니다. 이는 Raft와 같은 합의 알고리즘에서 상태와 로그 데이터를 저장하는 데 사용됩니다.

  2. Result: Result는 Rust의 표준 라이브러리에서 제공하는 열거형 타입으로, 함수가 성공하거나 실패할 수 있음을 나타냅니다. 성공 시 Ok 값을, 실패 시 Err 값을 반환합니다.

  3. Entry: Entry는 로그의 항목을 나타내는 데이터 구조체로, 주로 분산 시스템에서 상태 머신에 저장되는 각 명령어 또는 이벤트를 나타냅니다.

  4. trait: 트레이트(trait)는 Rust에서 공통된 기능(메서드)을 정의하는 인터페이스입니다. 트레이트는 여러 구조체나 타입이 공통으로 구현해야 하는 동작을 명시하는데 사용됩니다. 트레이트를 구현한 타입은 해당 트레이트에 정의된 메서드를 사용할 수 있습니다.

2. RocksDBStorage 구조체 정의

pub struct RocksDBStorage {
    db: DB,
    logger: Arc<dyn Logger>,
}
  1. pub: 공개 가시성 수정자. 이 구조체는 모듈 외부에서도 접근할 수 있습니다.
  2. struct: 구조체를 정의하는 키워드로, Rust에서 사용자 정의 데이터 타입입니다.
  3. RocksDBStorage: 구조체의 이름으로, RocksDB와 상호작용하는 데 사용됩니다.
  4. db: DB: db 필드는 RocksDB 라이브러리의 DB 타입을 사용한 데이터베이스 인스턴스입니다.
  5. logger: Arc<dyn Logger>: logger 필드는 여러 스레드에서 안전하게 공유될 수 있는 Logger 트레이트를 구현한 로거 객체입니다. Arc는 원자적 참조 카운터로, 공유 참조를 제공합니다.

1. 모듈 외부에서도 접근 가능해야 하는 이유

구조체를 pub으로 선언하면 모듈 외부에서도 접근할 수 있습니다. 이는 해당 구조체를 다른 파일이나 모듈에서 사용하거나 인스턴스화할 수 있게 합니다. 외부에서 RocksDB와 상호작용하려면 RocksDBStorage 구조체에 접근이 필요합니다.

2. 구조체를 쓰는 이유

구조체는 여러 데이터를 한 곳에 묶어서 다룰 수 있게 해줍니다. RocksDBStorage는 데이터베이스와 로거를 묶어 관련 기능을 한 곳에서 처리하도록 합니다.

3. 인스턴스

인스턴스는 구조체의 구체적인 사용 예시 또는 실체입니다. 예를 들어, RocksDBStorage의 인스턴스를 만들면 실제 데이터베이스와 로거 객체가 해당 인스턴스에 포함됩니다.

4. Arc

Arc는 원자적 참조 카운터(Atomic Reference Counter)의 약자입니다. Rust에서 Arc는 여러 스레드에서 안전하게 객체를 공유할 수 있게 하며, 객체가 더 이상 사용되지 않으면 메모리를 자동으로 해제합니다.

5. 공유 참조

공유 참조는 여러 참조자가 동시에 데이터를 읽을 수 있게 하는 방식입니다. Arc를 사용하면 하나의 객체를 여러 스레드에서 안전하게 참조할 수 있으며, 데이터의 불변성을 보장합니다.

3. RocksDBStorage 생성 함수

impl RocksDBStorage {
    pub fn create(path: &str, logger: Arc<dyn Logger>) -> Result<Self> {
        let mut opts = Options::default();
        opts.create_if_missing(true);
        let db = DB::open(&opts, path)?;
        Ok(RocksDBStorage { db, logger })
    }
}
  • create: 새로운 RocksDBStorage 인스턴스를 생성하는 함수입니다.

  • path: &str: 데이터베이스 파일 경로를 문자열 슬라이스로 받습니다.

  • logger: Arc: Logger 트레이트를 구현한 객체의 참조를 원자적 참조 카운터로 받습니다. 이는 여러 스레드에서 공유될 수 있습니다.

  • Result: 성공 시에는 RocksDBStorage의 인스턴스를, 실패 시에는 에러를 반환하는 타입입니다.

  • let mut opts: Options 객체를 가변 변수로 생성합니다.

  • Options::default(): RocksDB의 기본 옵션을 가져옵니다.

  • opts.create_if_missing(true): 데이터베이스 파일이 없으면 새로 생성하도록 설정합니다.

  • DB::open: 주어진 옵션과 경로를 사용해 데이터베이스를 엽니다.

  • Ok(RocksDBStorage { db, logger }): 성공적으로 데이터베이스가 열리면 RocksDBStorage의 인스턴스를 반환합니다.

4. StableStorage 트레이트 구현

impl StableStorage for RocksDBStorage {
    fn append(&mut self, entries: &[Entry]) -> Result<()> {
        for entry in entries {
            let key = format!("{:020}", entry.index);
            let value = entry.write_to_bytes()?;
            self.db.put(key, value)?;
        }
        Ok(())
    }
  • append 메서드: 여러 로그 엔트리를 받아서 RocksDB에 저장합니다.
    • entries: Raft 로그 엔트리들의 슬라이스입니다.
    • for 루프에서 각 엔트리의 인덱스를 키로 사용해 key를 생성하고, 엔트리 데이터를 직렬화한 후 value에 저장합니다.
    • self.db.put 메서드를 통해 생성된 keyvalue를 RocksDB에 저장합니다.

5. HardState 읽기 및 쓰기 구현

    fn hard_state(&self) -> Result<HardState> {
        let result = self.db.get("hard_state")?;
        match result {
            Some(data) => Ok(HardState::decode(&*data)?),
            None => Ok(HardState::default()),
        }
    }
  1. fn hard_state(&self) -> Result:

    • hard_state라는 메서드를 정의하며, 이 메서드는 HardState 타입을 반환합니다. 반환값은 성공하면 HardState, 실패 시 에러를 포함한 Result 타입입니다.
    • &self는 이 메서드가 구조체의 인스턴스에 대해 동작하는 것을 의미하며, 구조체의 값을 변경하지 않으므로 &self로 참조합니다.
  2. let result = self.db.get("hard_state")?:

    • self.dbRocksDB의 데이터베이스 객체를 가리키며, "hard_state"라는 키로 값을 가져옵니다. 이 부분에서 값이 없을 경우 에러를 반환할 수 있으므로 ?를 사용하여 에러 처리를 자동으로 처리합니다.
  3. match result:

    • result는 데이터베이스에서 가져온 값입니다. 이 값이 Some(data)일 경우, 데이터를 사용하고, 값이 없을 경우(None) 기본값을 반환합니다.
  4. Some(data) => Ok(HardState::decode(&*data)?):

    • 만약 값이 있다면 data를 사용하여 HardState로 디코딩한 후, 이를 Ok로 감싸서 반환합니다. 디코딩 중 에러가 발생할 경우 이를 처리합니다.
  5. None => Ok(HardState::default()):

    • 데이터가 없을 경우 HardState의 기본값을 반환합니다.

이 메서드는 RocksDB에서 hard_state라는 키에 저장된 데이터를 가져오고, 그 데이터를 디코딩하여 반환하는 역할을 합니다. 데이터가 없다면 기본값을 반환하는 방식으로 동작합니다.

    fn set_hard_state(&mut self, hard_state: &HardState) -> Result<()> {
        let value = hard_state.write_to_bytes()?;
        self.db.put("hard_state", value)?;
        Ok(())
    }
  1. fn set_hard_state(&mut self, hard_state: &HardState) -> Result<()>:

    • set_hard_stateHardState 객체를 받아 이를 데이터베이스에 저장하는 함수입니다.
    • &mut self는 이 함수가 구조체의 상태를 변경함을 나타내며, 가변 참조를 사용합니다.
  2. let value = hard_state.write_to_bytes()?:

    • hard_state 객체를 바이트 배열로 직렬화합니다. 직렬화가 실패하면 에러를 반환합니다.
  3. self.db.put("hard_state", value)?:

    • 직렬화된 데이터를 "hard_state"라는 키로 RocksDB에 저장합니다. 저장 중 에러가 발생하면 이를 처리합니다.
  4. Ok(()):

    • 모든 작업이 성공적으로 완료되면 Ok(())를 반환합니다.

이 함수는 HardState를 직렬화하여 RocksDB에 저장하는 역할을 합니다.

1. HardState란 무엇인가?

  • HardState는 Raft 알고리즘에서 리더와 팔로워 간의 일관성 유지에 중요한 메타데이터입니다. 현재 커밋된 인덱스, 투표한 후보자, 마지막으로 처리된 로그의 term 정보를 포함합니다. 이를 저장하여 시스템 재시작 시에도 현재 상태를 유지합니다.

2. HardState를 데이터베이스에 저장하는 이유?

  • HardState를 저장함으로써 재부팅 시에도 Raft의 현재 상태를 기억할 수 있습니다. 이 정보는 클러스터 간의 일관성을 유지하는 데 중요합니다.

3. self란 무엇인가?

  • self는 Rust에서 현재 객체를 가리킵니다. 여기서는 RocksDBStorage 구조체의 인스턴스를 가리킵니다.

4. 직렬화란 무엇이고 왜 해야 하는가?

  • 직렬화는 객체나 데이터를 바이트 배열로 변환하는 과정입니다. 이렇게 해야만 데이터베이스나 파일에 저장하거나 네트워크를 통해 전송할 수 있습니다. HardState를 직렬화하는 이유는 데이터를 안정적으로 저장하고 복원하기 위함입니다.

6. ConfState 읽기 및 쓰기 구현

    fn conf_state(&self) -> Result<ConfState> {
        let result = self.db.get("conf_state")?;
        match result {
            Some(data) => Ok(ConfState::decode(&*data)?),
            None => Ok(ConfState::default()),
        }
    }
  • conf_state: Raft의 ConfState를 가져오는 메서드입니다.
    • self.db.get("conf_state"): RocksDB에서 conf_state라는 키로 저장된 값을 가져옵니다.
    • Some(data) => Ok(ConfState::decode(&*data)?): 데이터를 디코딩 후 반환합니다.
    • None => Ok(ConfState::default()): conf_state가 없을 경우 기본값을 반환합니다.

1. ConfState란?

ConfState는 Raft 알고리즘에서 클러스터의 구성 상태를 나타냅니다. 즉, 어떤 노드들이 투표할 자격을 가지는지, 노드의 리스트를 유지합니다. 시스템 재시작 시 클러스터 구성을 복구하기 위해 이를 가져와야 합니다.

2. 디코딩이란?

디코딩은 직렬화된 데이터를 다시 원래의 데이터 구조로 복원하는 과정입니다. 데이터베이스에 저장된 바이트 배열을 ConfState로 변환해, 프로그램에서 사용 가능한 형태로 만드는 작업입니다.

    fn set_conf_state(&mut self, conf_state: &ConfState) -> Result<()> {
        let value = conf_state.write_to_bytes()?;
        self.db.put("conf_state", value)?;
        Ok(())
    }
  • set_conf_state: 새로운 ConfState를 설정하는 메서드입니다.
    • conf_state.write_to_bytes(): ConfState를 직렬화합니다.
    • self.db.put("conf_state", value): RocksDB에 conf_state를 저장합니다.

1. 새로운 ConfState를 설정하는 이유?

새로운 ConfState를 설정하는 이유는 클러스터의 상태 변화 때문입니다. Raft 클러스터에서는 노드 추가, 제거, 혹은 투표자 변경과 같은 이벤트가 발생할 수 있습니다. 이러한 변경 사항을 반영하려면 최신 클러스터 구성을 ConfState에 저장해야 합니다. 이 정보는 리더 선출이나 데이터 복제 등의 중요한 결정에 사용됩니다. 따라서, 안정적인 시스템 동작을 위해 ConfState를 업데이트하여 시스템이 현재 클러스터 상태를 올바르게 반영하도록 합니다.

7. 스냅샷 생성 및 적용

    fn create_snapshot(&mut self, data: Vec<u8>, index: u64, term: u64) -> Result<()> {
        let mut snapshot = Snapshot::default();
        snapshot.set_data(data);
        snapshot.mut_metadata().index = index;
        snapshot.mut_metadata().term = term;
        let value = snapshot.write_to_bytes()?;
        self.db.put("snapshot", value)?;
        Ok(())
    }
  • &mut self: 현재 구조체의 인스턴스를 가리키는 가변 참조입니다. 이로 인해 구조체의 필드를 변경할 수 있습니다.

  • data: Vec: 바이트 배열로 구성된 벡터를 나타내며, 스냅샷에 저장할 데이터를 의미합니다.

  • index: u64: 64비트 부호 없는 정수로, 스냅샷이 찍힌 시점의 인덱스를 나타냅니다.

  • term: u64: 64비트 부호 없는 정수로, 스냅샷이 찍힌 임기를 나타냅니다.

  • Result<()>: 성공 시 빈 튜플(()), 실패 시 오류를 반환하는 결과 타입입니다.

  • Snapshot::default(): 기본값을 사용하여 스냅샷을 생성하는 메서드입니다.

  • snapshot.set_data(data): 스냅샷에 데이터를 설정합니다.

  • snapshot.mut_metadata(): 메타데이터를 가변으로 접근하여 indexterm 값을 설정합니다.

  • self.db.put("snapshot", value)?: 스냅샷 데이터를 직렬화한 후 데이터베이스에 저장하는 코드입니다. "snapshot"이라는 키로 저장됩니다.

  • Ok(()): 성공적으로 작업이 완료되었음을 나타내는 반환 값입니다.

1. 가변 참조?

객체의 값을 변경할 수 있는 참조입니다. Rust에서 기본적으로 데이터는 불변(immutable)하며, 가변 참조를 통해서만 데이터를 수정할 수 있습니다. 이를 사용하는 이유는 안전하게 데이터를 수정하면서 동시에 소유권을 넘기지 않기 위해서입니다.

2. 벡터?

가변 크기의 배열을 나타내는 자료 구조입니다. 동적으로 크기를 변경할 수 있어, 데이터의 개수를 미리 알 수 없는 상황에서 유용합니다.

3. 메타데이터?

메타데이터를 사용하는 이유는 스냅샷에 중요한 정보를 포함시켜 Raft 알고리즘의 일관성과 안정성을 유지하기 위해서입니다. 메타데이터에는 스냅샷이 생성된 시점의 인덱스(index)임기(term)와 같은 중요한 정보가 포함됩니다. 이러한 정보는 다음과 같은 이유로 필수적입니다:

  1. 인덱스(index):

    • 스냅샷이 어느 시점까지의 로그를 포함하고 있는지를 나타냅니다.
    • 로그 복제 및 일치 여부를 판단할 때 사용됩니다.
    • 다른 노드들이 이 스냅샷을 수신할 때, 현재까지 적용된 로그의 범위를 정확히 알 수 있습니다.
  2. 임기(term):

    • 스냅샷이 생성된 시점의 리더의 임기(term)를 나타냅니다.
    • 로그의 일관성을 유지하고, 리더 선출 과정에서 용어 정보를 활용합니다.
    • 노드 간의 임기 비교를 통해 최신 상태를 파악할 수 있습니다.

메타데이터를 설정함으로써:

  • 데이터 일관성 유지: 노드들이 동일한 상태를 공유하고 있다는 것을 보장합니다.
  • 로그 압축 및 복구: 오래된 로그를 스냅샷으로 대체하고, 필요한 경우 스냅샷을 통해 상태를 복구할 수 있습니다.
  • 통신 효율성 향상: 전체 로그를 전송하는 대신 스냅샷과 메타데이터를 이용하여 필요한 최소한의 정보만 교환합니다.

따라서 snapshot.mut_metadata()를 통해 메타데이터에 인덱스와 임기를 설정하는 것은 Raft 프로토콜의 핵심 기능을 구현하기 위한 필수적인 과정입니다. 이것이 없으면 노드들은 스냅샷의 상태를 정확히 파악할 수 없고, 이는 클러스터의 일관성과 안정성에 문제를 초래할 수 있습니다.

이 함수는 주로 Raft 상태의 스냅샷을 생성하고 이를 데이터베이스에 저장하는 역할을 합니다.

    fn apply_snapshot(&mut self, snapshot: Snapshot) -> Result<()> {
        let metadata = snapshot.get_metadata();
        let conf_state = metadata.get_conf_state();
        self.set_conf_state(conf_state)?;
        self.set_hard_state_commit(metadata.index)?;
        self.db.put("snapshot", snapshot.write_to_bytes()?)?;
        Ok(())
    }
  • apply_snapshot: 저장된 스냅샷을 적용하는 메서드입니다.
    • metadata.get_conf_state(): 스냅샷의 ConfState를 가져옵니다.
    • self.set_conf_state(conf_state)?: 새로운 ConfState를 설정합니다.
    • self.set_hard_state_commit(metadata.index)?: HardState의 커밋 인덱스를 설정합니다.

8. 기타 메서드 - 로그 관리 및 압축

    fn compact(&mut self, index: u64) -> Result<()> {
    // Compress the data within a specific key range
    let start_key = format!("{:020}", 0);  // Start of the range
    let end_key = format!("{:020}", index);  // End of the range

    // Specify the range for compression in RocksDB
    self.db.compact_range(Some(&start_key), Some(&end_key));

    Ok(())
}

    fn all_entries(&self) -> raft::Result<Vec<Entry>> {
        let mut entries = Vec::new();
        let iter = self.db.iterator(rocksdb::IteratorMode::Start);

        for (key, value) in iter {
            let entry = Entry::decode(&*value)?;
            entries.push(entry);
        }

        Ok(entries)
    }
  • compact: 오래된 로그를 자동으로 제거하는 RocksDB의 압축 기능을 사용합니다.
  • all_entries: RocksDB에 저장된 모든 로그 엔트리를 반환합니다.

9. Storage 트레이트 구현

impl Storage for RocksDBStorage {
    fn initial_state(&self) -> raft::Result<RaftState> {
        let hard_state = self.hard_state()?;
        let conf_state = self.conf_state()?;
        Ok(RaftState {
            hard_state,
            conf_state,
        })
    }
  • initial_state: Raft 노드의 초기 상태를 반환합니다. HardStateConfState로 구성됩니다.

1. Raft 노드의 초기 상태를 반환하는 이유?

Raft 노드의 초기 상태를 반환하는 이유는 Raft 알고리즘의 정상적인 시작을 위해서입니다. 노드가 재시작되거나 새로운 클러스터에 참여할 때, 이전에 저장된 상태인 HardState와 ConfState를 기반으로 다시 동작을 시작해야 합니다. HardState는 마지막으로 커밋된 로그 인덱스 및 리더 정보 등 중요한 정보를 담고 있으며, ConfState는 현재 클러스터 구성원을 정의합니다. 이를 통해 클러스터의 연속성을 유지하고 안정적으로 운영할 수 있습니다.

    fn entries(&self, low: u64, high: u64, max_size: impl Into<Option<u64>>, ctx: GetEntriesContext) -> raft::Result<Vec<Entry>> {
        let mut entries = Vec::new();
        for i in low..high {
            let key = format!("{:020}", i);
            if let Some(value) = self.db.get(key)? {
                let entry = Entry::decode(&*value)?;
                entries.push(entry);
            }
        }
        limit_size(&mut entries, max_size.into());
        Ok(entries)
    }
  • entries: 지정된 범위의 로그 엔트리를 반환합니다.
    fn term(&self, idx: u64) -> raft::Result<u64> {
        let key = format!("{:020}", idx);
        if let Some(value) = self.db.get(key)? {
            let entry = Entry::decode(&*value)?;
            Ok(entry.term)
        } else {
            Err(raft::Error::Store(raft::StorageError::Unavailable))
        }
    }
  • term: 특정 인덱스의 로그 엔트리의 term(임기)을 반환합니다.
    fn first_index(&self) -> raft::Result<u64> {
        let iter = self.db.iterator(rocksdb::IteratorMode::Start);
        match iter.next() {
            Some((key, _)) => {
                let idx = String::from_utf8(key.to_vec())?.parse::<u64>()?;
                Ok(idx)
            }
            None => Ok(0),
        }
    }

    fn last_index(&self) -> raft::Result<u64> {
        let iter = self.db.iterator(rocksdb::IteratorMode::End);
        match iter.next() {
            Some((key, _)) => {
                let idx = String::from_utf8(key.to_vec())?.parse::<u64>()?;
                Ok(idx)
            }
            None => Ok(0),
        }
    }
  • first_indexlast_index: 저장된 로그 엔트리의 첫 번째 및 마지막 인덱스를 반환합니다.
    fn snapshot(&self, request_index: u64, to: u64) -> raft::Result<Snapshot> {
        if let Some(value) = self.db.get("snapshot")? {
            let snapshot = Snapshot::decode(&*value)?;
            Ok(snapshot)
        } else {
            Err(raft::Error::Store(raft::StorageError::SnapshotTemporarilyUnavailable))
        }
    }
}
  • snapshot: 저장된 스냅샷을 반환합니다.

10. 테스트 모듈

#[cfg(test)]
mod test {
    use super::*;
    use rocksdb::{Options, DB};
    use std::{fs, sync::Arc};
    use tempfile::tempdir;
    use crate::raft::{
        default_logger,
        eraftpb::{Entry, Snapshot},
        logger::Slogger,
        Config as RaftConfig, Error as RaftError, GetEntriesContext, Storage, StorageError,
    };
    use prost::Message;
  1. #[cfg(test)]: 이 속성은 테스트 모듈을 컴파일하고 실행할 때만 포함되도록 설정합니다. 즉, 코드가 테스트할 때만 이 모듈이 컴파일됩니다.

  2. mod test: "test"라는 이름의 모듈을 정의합니다. 이 모듈 내부에는 테스트 케이스를 작성할 수 있습니다.

  3. use super::*;: 현재 모듈의 부모 모듈에 있는 모든 항목을 가져옵니다. 이는 test 모듈에서 부모 모듈의 코드를 재사용할 수 있게 합니다.

  4. use rocksdb::{Options, DB};: RocksDB 라이브러리에서 OptionsDB 객체를 가져옵니다. 이는 RocksDB의 데이터베이스를 설정하고 접근하는 데 사용됩니다.

  5. use std::{fs, sync::Arc};: 표준 라이브러리의 fs(파일 시스템 관련 기능)와 sync::Arc(원자적 참조 카운팅 포인터)를 가져옵니다. Arc는 여러 스레드에서 참조 가능한 데이터를 관리할 때 사용됩니다.

  6. use tempfile::tempdir;: 임시 디렉토리를 생성하는 데 사용됩니다. 테스트가 끝나면 자동으로 삭제되는 임시 저장소입니다.

  7. use crate::raft::{default_logger, eraftpb::{Entry, Snapshot}, logger::Slogger, Config as RaftConfig, Error as RaftError, GetEntriesContext, Storage, StorageError};: raft 모듈의 여러 요소를 가져옵니다. Raft 프로토콜과 관련된 엔트리, 스냅샷, 에러 처리, 로깅 기능을 포함합니다.

  8. use prost::Message;: Protocol Buffers로 직렬화 및 역직렬화하는 데 사용됩니다.

1. 부모 모듈에 있는 모든 항목을 가져오는 이유:

use super::*; 구문은 테스트 모듈에서 부모 모듈의 모든 항목을 재사용할 수 있게 합니다. 이 방법을 통해 부모 모듈의 함수, 구조체, 상수 등을 개별적으로 다시 정의하지 않고도 테스트할 수 있습니다. 이는 코드 중복을 줄이고, 일관성을 유지하며, 테스트 코드가 부모 모듈의 로직과 직접적으로 연관되도록 하기 위함입니다.

2. Protocol Buffers를 사용하는 이유:

Protocol Buffers(또는 prost::Message)는 데이터를 직렬화할 때 사용됩니다. 이는 빠르고 효율적인 방식으로 구조화된 데이터를 네트워크로 전송하거나 파일에 저장할 수 있게 해줍니다. Raft처럼 네트워크 통신이 중요한 분산 시스템에서는 데이터를 작은 크기로 직렬화하여 빠르게 전송하는 것이 중요하므로 Protocol Buffers가 유용합니다.


1) 유틸리티 함수 정의

fn new_entry(index: u64, term: u64) -> Entry {
    let mut e = Entry::default();
    e.term = term;
    e.index = index;
    e
}
  • new_entry: 로그 엔트리(Entry)를 생성하는 유틸리티 함수입니다. 인덱스와 term 값을 설정한 후 기본 엔트리를 반환합니다.
fn size_of<T: Message>(m: &T) -> u32 {
    m.encoded_len() as u32
}
  • size_of: prost::Message 타입의 메시지를 인코딩했을 때 그 크기를 반환하는 함수입니다.
fn new_snapshot(index: u64, term: u64, voters: Vec<u64>) -> Snapshot {
    let mut s = Snapshot::default();
    s.mut_metadata().index = index;
    s.mut_metadata().term = term;
    s.mut_metadata().mut_conf_state().voters = voters;
    s
}
  • new_snapshot: 인덱스, term, voters 값을 설정한 새로운 스냅샷을 생성하는 함수입니다.

2) 테스트 환경 설정 및 해제 함수

fn setup() -> (RocksDBStorage, String) {
    let tempdir = tempdir().expect("Failed to create temporary directory");
    let path = tempdir.path().to_str().unwrap().to_owned();

    let logger = Arc::new(Slogger {
        slog: default_logger(),
    });

    let storage = RocksDBStorage::create(&path, logger).expect("Failed to create RocksDB storage");

    (storage, path)
}

fn teardown(path: String) {
    fs::remove_dir_all(path).expect("Failed to delete test directory");
}
  • setup: 임시 디렉토리를 생성하여 RocksDBStorage를 초기화하고, 경로와 인스턴스를 반환하는 함수입니다. 이 함수는 테스트 환경을 설정하는데 사용됩니다.
  • teardown: 테스트가 끝난 후 임시 디렉토리를 제거하여 환경을 정리하는 함수입니다.
  • expect는 Rust에서 오류 처리를 단순하게 하기 위한 메서드입니다. 값이 Ok일 경우 해당 값을 반환하지만, Err일 경우 프로그램이 즉시 패닉 상태로 빠지며 expect에 전달한 메시지를 출력합니다. 이를 통해 코드가 실패할 때, 좀 더 명확한 오류 메시지를 제공할 수 있습니다.

3) 로그 엔트리의 term 검증 테스트

#[test]
fn test_storage_term() {
    let (mut storage, path) = setup();

    let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
    let mut tests = vec![
        (2u64, Err(RaftError::Store(StorageError::Compacted))),
        (3u64, Ok(3)),
        (4u64, Ok(4)),
        (5u64, Ok(5)),
        (6u64, Err(RaftError::Store(StorageError::Unavailable))),
    ];

    storage.append(&ents).expect("Failed to append entries");

    for (i, (idx, wterm)) in tests.drain(..).enumerate() {
        let t = storage.term(idx);
        if t != wterm {
            panic!("#{}: expect res {:?}, got {:?}", i, wterm, t);
        }
    }

    teardown(path);
}
#[test]
fn test_storage_term() {
    let (mut storage, path) = setup();
  1. setup() 호출: 테스트 환경을 준비합니다. RocksDB 인스턴스를 초기화하고, 로그 저장소(storage)와 임시 디렉토리 경로(path)를 반환합니다.
    let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
  1. 로그 엔트리 생성: new_entry 함수로 임기(term)와 인덱스가 설정된 Raft 로그 엔트리(Entry)를 벡터로 만듭니다. 여기서는 3, 3, 4, 4, 5, 5가 포함됩니다.
    let mut tests = vec![
        (2u64, Err(RaftError::Store(StorageError::Compacted))),
        (3u64, Ok(3)),
        (4u64, Ok(4)),
        (5u64, Ok(5)),
        (6u64, Err(RaftError::Store(StorageError::Unavailable))),
    ];
  1. 테스트 케이스 설정: 각 인덱스에 대해 예상되는 결과를 담은 테스트 벡터를 만듭니다. 인덱스 2는 압축된 로그(Compacted), 3~5는 각각의 임기를 반환하며, 6은 로그가 존재하지 않아 오류(Unavailable)를 반환해야 합니다.
    storage.append(&ents).expect("Failed to append entries");
  1. 로그 추가: RocksDB에 ents 벡터에 있는 로그 엔트리들을 추가합니다.
    for (i, (idx, wterm)) in tests.drain(..).enumerate() {
        let t = storage.term(idx);
        if t != wterm {
            panic!("#{}: expect res {:?}, got {:?}", i, wterm, t);
        }
    }
  1. 테스트 실행: 각 인덱스에 대해 storage.term(idx)로 실제 결과를 확인한 후, 예상 결과(wterm)와 비교합니다. 예상과 다르면 panic!으로 테스트를 실패 처리합니다.
    teardown(path);
}
  1. 정리: teardown(path)를 호출해 임시 디렉토리와 파일을 삭제합니다.

4) 로그 엔트리 목록 반환 테스트

#[test]
fn test_storage_entries() {
    let (mut storage, path) = setup();

    let ents = vec![
        new_entry(3, 3),
        new_entry(4, 4),
        new_entry(5, 5),
        new_entry(6, 6),
    ];
    let max_u64 = u64::max_value();
    let mut tests = vec![
        (
            2,
            6,
            max_u64,
            Err(RaftError::Store(StorageError::Compacted)),
        ),
        (3, 4, max_u64, Ok(vec![new_entry(3, 3)])),
        (4, 5, max_u64, Ok(vec![new_entry(4, 4)])),
        (4, 6, max_u64, Ok(vec![new_entry(4, 4), new_entry(5, 5)])),
        (
            4,
            7,
            max_u64,
            Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
        ),
        (4, 7, 0, Ok(vec![new_entry(4, 4)])),
        (
            4,
            7,
            u64::from(size_of(&ents[1]) + size_of(&ents[2])),
            Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
        ),
        (
            4,
            7,
            u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) / 2),
            Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
        ),
        (
            4,
            7,
            u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) - 1),
            Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
        ),
        (
            4,
            7,
            u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3])),
            Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
        ),
    ];

    storage.append(&ents).expect("Failed to append entries");

    for (i, (lo, hi, maxsize, wentries)) in tests.drain(..).enumerate() {
        let e = storage.entries(lo, hi, maxsize, GetEntriesContext::empty(false));
        if e != wentries {
            panic!("#{}: expect entries {:?}, got {:?}", i, wentries, e);
        }
    }

    teardown(path);
}
#[test]
fn test_storage_entries() {
    let (mut storage, path) = setup();
  1. setup() 호출: RocksDB 저장소를 초기화하고 테스트 환경을 구성합니다.
let ents = vec![
    new_entry(3, 3),
    new_entry(4, 4),
    new_entry(5, 5),
    new_entry(6, 6),
];
  1. 테스트 데이터 생성: new_entry 함수로 임기(term)와 인덱스가 설정된 Raft 로그 엔트리(Entry) 4개를 생성합니다.
let max_u64 = u64::max_value();
  1. max_u64 설정: 테스트에서 사용할 최대 크기 값을 설정합니다. 이는 u64 자료형의 최대값입니다.
let mut tests = vec![
    // 테스트 케이스 설정
];
  1. 테스트 케이스 설정: 각 테스트 케이스는 (lo, hi, maxsize, expected) 형태로 구성됩니다. 여기서 lohi는 조회할 인덱스 범위, maxsize는 반환할 최대 크기, expected는 예상되는 결과입니다. 각 케이스는 다른 조건을 테스트합니다.
storage.append(&ents).expect("Failed to append entries");
  1. 로그 엔트리 추가: 테스트할 엔트리들을 RocksDB에 저장합니다.
for (i, (lo, hi, maxsize, wentries)) in tests.drain(..).enumerate() {
    let e = storage.entries(lo, hi, maxsize, GetEntriesContext::empty(false));
    if e != wentries {
        panic!("#{}: expect entries {:?}, got {:?}", i, wentries, e);
    }
}
  1. 테스트 실행: 각 테스트 케이스에서 storage.entries(lo, hi, maxsize, ...)로 로그 엔트리들을 조회하고, 예상된 엔트리(wentries)와 비교합니다. 불일치가 있으면 panic!으로 오류를 출력하며 테스트 실패를 알립니다.
teardown(path);
  1. 정리: 테스트가 끝나면 임시 디렉토리와 데이터를 삭제하여 깨끗한 환경을 유지합니다.

5) 최종 로그 인덱스 검증 테스트

#[test]
fn test_storage_last_index() {
    let (mut storage, path) = setup();

    let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
    storage.append(&ents).expect("Failed to append entries");

    let wresult = Ok(5);
    let result = storage.last_index();
    if result != wresult {
        panic!("want {:?}, got {:?}", wresult, result);
    }

    storage.append([new_entry(6, 5)].as_ref()).expect("Failed to append entry");
    let wresult = Ok(6);
    let result = storage.last_index();
    if result != wresult {
        panic!("want {:?}, got {:?}", wresult, result);
    }

    teardown(path);
}
#[test]
fn test_storage_last_index() {
    let (mut storage, path) = setup();
  1. setup() 호출: RocksDB 스토리지를 초기화하고 임시 파일 경로를 설정합니다. 테스트 환경을 준비하는 단계입니다.
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
storage.append(&ents).expect("Failed to append entries");
  1. 로그 엔트리 추가: 3개의 Raft 로그 엔트리를 storage.append()로 RocksDB에 저장합니다.
let wresult = Ok(5);
let result = storage.last_index();
if result != wresult {
    panic!("want {:?}, got {:?}", wresult, result);
}
  1. 마지막 인덱스 확인: storage.last_index()를 호출하여 저장된 마지막 로그 인덱스가 5인지 확인합니다. 예상 값(wresult)과 실제 값(result)을 비교해 일치하지 않으면 테스트 실패를 알립니다.
storage.append([new_entry(6, 5)].as_ref()).expect("Failed to append entry");
let wresult = Ok(6);
let result = storage.last_index();
if result != wresult {
    panic!("want {:?}, got {:?}", wresult, result);
}
  1. 새로운 엔트리 추가 및 검증: 인덱스 6, 임기 5의 엔트리를 추가한 후 다시 storage.last_index()로 마지막 인덱스를 6으로 확인합니다. 이 값도 검증하여 일치하지 않으면 오류를 발생시킵니다.
teardown(path);
  1. 정리 작업: teardown()을 호출하여 테스트 환경을 종료하고 임시로 사용한 리소스를 정리합니다.

6) 첫 번째 로그 인덱스 검증 테스트

#[test]
fn test_storage_first_index() {
    let (mut storage, path) = setup();

    let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
    storage.append(&ents).expect("Failed to append entries");

    assert_eq!(storage.first_index(), Ok(3));
    storage.compact(4).expect("Failed to compact");
    assert_eq!(storage.first_index(), Ok(4));

    teardown(path);
}
  • test_storage_first_index: 저장된 엔트리에서 첫 번째 인덱스를 반환하는 first_index 메서드를 테스트합니다. 엔트리를 압축한 후에도 정상적으로 첫 번째 인덱스가 업데이트되는지 확인합니다.

7) 로그 압축 테스트

    fn test_storage_compact() {
        let (mut storage, path) = setup();

        let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
        let mut tests = vec![(2, 3, 3, 3), (3, 3, 3, 3), (4, 4, 4, 2), (5, 5, 5, 1)];
        for (i, (idx, windex, wterm, wlen)) in tests.drain(..).enumerate() {
            storage.append(&ents).expect("Failed to append entries");

            storage.compact(idx).expect("Failed to compact");
            let index = storage.first_index().expect("Failed to get first index");
            if index != windex {
                panic!("#{}: want {}, index {}", i, windex, index);
            }
            let term = if let Ok(v) =
                storage.entries(index, index + 1, 1, GetEntriesContext::empty(false))
            {
                v.first().map_or(0, |e| e.term)
            } else {
                0
            };
            if term != wterm {
                panic!("#{}: want {}, term {}", i, wterm, term);
            }
            let last = storage.last_index().expect("Failed to get last index");
            let len = storage
                .entries(index, last + 1, 100, GetEntriesContext::empty(false))
                .expect("Failed to get entries")
                .len();
            if len != wlen {
                panic!("#{}: want {}, term {}", i, wlen, len);
            }
        }

        teardown(path);
    }
#[test]
fn test_storage_compact() {
    let (mut storage, path) = setup();
  1. setup() 호출: RocksDB 스토리지를 초기화하고 테스트를 위한 임시 디렉토리를 설정합니다.
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
  1. 엔트리 추가: 인덱스 3에서 5까지의 엔트리를 추가합니다.
let mut tests = vec![(2, 3, 3, 3), (3, 3, 3, 3), (4, 4, 4, 2), (5, 5, 5, 1)];
  1. 테스트 케이스 설정: 각 테스트 케이스는 compact() 호출 후의 기대 결과를 나타냅니다. 여기서 windex, wterm, wlen은 기대값입니다.
for (i, (idx, windex, wterm, wlen)) in tests.drain(..).enumerate() {
    storage.append(&ents).expect("Failed to append entries");

    storage.compact(idx).expect("Failed to compact");
  1. compact() 호출: 각 테스트에서 지정된 인덱스까지의 로그를 압축합니다.
let index = storage.first_index().expect("Failed to get first index");
if index != windex {
    panic!("#{}: want {}, index {}", i, windex, index);
}
  1. 첫 번째 인덱스 검증: first_index()를 호출하여 압축 후 첫 번째 인덱스를 검증합니다. 기대값(windex)과 일치하지 않으면 패닉을 발생시킵니다.
let term = if let Ok(v) = storage.entries(index, index + 1, 1, GetEntriesContext::empty(false)) {
    v.first().map_or(0, |e| e.term)
} else {
    0
};
if term != wterm {
    panic!("#{}: want {}, term {}", i, wterm, term);
}
  1. term 값 검증: entries()를 호출해 해당 인덱스의 로그 엔트리를 확인하고, term 값이 예상된 값(wterm)과 일치하는지 확인합니다.
let last = storage.last_index().expect("Failed to get last index");
let len = storage.entries(index, last + 1, 100, GetEntriesContext::empty(false))
    .expect("Failed to get entries")
    .len();
if len != wlen {
    panic!("#{}: want {}, len {}", i, wlen, len);
}
  1. 로그 길이 검증: 압축 후 로그의 길이(len)가 기대값(wlen)과 일치하는지 확인합니다.
teardown(path);
  1. 테스트 환경 정리: teardown()을 호출하여 테스트 환경을 정리합니다.

8) 스냅샷 적용 테스트

#[test]
fn test_storage_apply_snapshot() {
    let (mut storage, path) = setup();

    let nodes = vec![1, 2, 3];  // 스냅샷에 포함된 노드들

    // 스냅샷을 생성하고 적용
    let snap = new_snapshot(4, 4, nodes.clone());
    storage.apply_snapshot(snap).expect("Failed to apply snapshot");

    teardown(path);
}
#[test]
fn test_storage_compact() {
    let (mut storage, path) = setup();
  1. setup() 호출: RocksDB 스토리지를 초기화하고 테스트를 위한 임시 디렉토리를 설정합니다.
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
  1. 엔트리 추가: 인덱스 3에서 5까지의 엔트리를 추가합니다.
let mut tests = vec![(2, 3, 3, 3), (3, 3, 3, 3), (4, 4, 4, 2), (5, 5, 5, 1)];
  1. 테스트 케이스 설정: 각 테스트 케이스는 compact() 호출 후의 기대 결과를 나타냅니다. 여기서 windex, wterm, wlen은 기대값입니다.
for (i, (idx, windex, wterm, wlen)) in tests.drain(..).enumerate() {
    storage.append(&ents).expect("Failed to append entries");

    storage.compact(idx).expect("Failed to compact");
  1. compact() 호출: 각 테스트에서 지정된 인덱스까지의 로그를 압축합니다.
let index = storage.first_index().expect("Failed to get first index");
if index != windex {
    panic!("#{}: want {}, index {}", i, windex, index);
}
  1. 첫 번째 인덱스 검증: first_index()를 호출하여 압축 후 첫 번째 인덱스를 검증합니다. 기대값(windex)과 일치하지 않으면 패닉을 발생시킵니다.
let term = if let Ok(v) = storage.entries(index, index + 1, 1, GetEntriesContext::empty(false)) {
    v.first().map_or(0, |e| e.term)
} else {
    0
};
if term != wterm {
    panic!("#{}: want {}, term {}", i, wterm, term);
}
  1. term 값 검증: entries()를 호출해 해당 인덱스의 로그 엔트리를 확인하고, term 값이 예상된 값(wterm)과 일치하는지 확인합니다.
let last = storage.last_index().expect("Failed to get last index");
let len = storage.entries(index, last + 1, 100, GetEntriesContext::empty(false))
    .expect("Failed to get entries")
    .len();
if len != wlen {
    panic!("#{}: want {}, len {}", i, wlen, len);
}
  1. 로그 길이 검증: 압축 후 로그의 길이(len)가 기대값(wlen)과 일치하는지 확인합니다.
teardown(path);
  1. 테스트 환경 정리: teardown()을 호출하여 테스트 환경을 정리합니다.

9) 최종 테스트 요약

이 모듈은 RocksDB를 기반으로 한 Raft 저장소의 주요 기능을 테스트합니다. 각 테스트는 다음을 확인합니다:
1. 로그 엔트리 추가: 로그가 정상적으로 추가되는지.
2. term 값 확인: 특정 인덱스에서 올바른 term 값이 반환되는지.
3. 엔트리 범위 반환: 지정된 범위의 엔트리 목록이 정확히 반환되는지.
4. 압축 기능: 압축이 수행된 후의 상태가 올바른지.
5. 스냅샷 적용: 스냅샷 적용 과정에서 상태가 적절히 업데이트되는지.

cf) 테스트 건너뛰기 cargo test -- --skip test_dynamic_bootstrap --skip test_static_bootstrap --skip test_data_replication --skip test_leader_election_in_three_node_example --skip test_storage_last_index --skip test_storage_first_index --skip test_storage_apply_snapshot --skip test_storage_term --skip test_storage_compact --skip test_storage_append --skip test_storage_entries

profile
터널을 지나고 있을 뿐, 길은 여전히 열려 있다.

0개의 댓글