Rocks DB 스토리지 구현 3)

Tasker_Jang·2024년 9월 21일
0

개발을 하다 보면 예상치 못한 에러를 만나는 일이 자주 있습니다. 이번 프로젝트에서도 몇 가지 에러가 발생했고, 이를 해결하는 과정에서 많은 것을 배우게 되었습니다. 이 글에서는 코드 작성 중에 직면했던 에러들과 그 해결 과정을 공유하려고 합니다.

에러를 처음 마주쳤을 때는 당황스럽기도 하고 해결 방법을 몰라 막막하기도 했지만, 차근차근 문제의 원인을 파악하고 다양한 시도를 통해 해결할 수 있었습니다. 제가 겪었던 시행착오와 해결 방법들이 비슷한 문제를 겪고 있는 분들에게 작은 도움이 되기를 바랍니다.

1. use 문을 통한 모듈 임포트

  • crate::raft::eraftpb::Entry: Raft 프로토콜에서 사용되는 로그 엔트리를 정의한 모듈입니다. Raft 로그 엔트리는 노드 간 복제를 위해 필수적입니다.
  • crate::raft::logger::Logger: 로그 기록을 위한 모듈로, Raft의 실행 과정을 추적하기 위해 사용됩니다.
  • crate::raft::prelude::{ConfState, HardState, Snapshot}: Raft의 상태 정보와 관련된 구조체들입니다. ConfState는 클러스터의 구성 상태, HardState는 Raft의 하드 상태, Snapshot은 Raft의 스냅샷을 나타냅니다.
  • crate::raft::util::limit_size: 유틸리티 함수로, Raft 메시지 또는 로그 엔트리의 크기를 제한하는 데 사용됩니다.
  • crate::raft::{GetEntriesContext, RaftState, Storage}: Raft 상태와 스토리지 관련 구조체들로, 로그 엔트리 조회 및 저장소에 관련된 기능을 담당합니다.
  • crate::{Result, StableStorage}: Result는 에러 처리에 사용되며, StableStorage는 안정적인 스토리지 인터페이스를 제공합니다.
  • prost::Message: 프로토콜 버퍼 메시지를 처리하는 데 사용되는 prost 라이브러리입니다.
  • rocksdb::{Options, DB}: RocksDB는 고성능 키-값 스토리지 엔진으로, 여기서는 Raft의 로그와 상태를 저장하는 데 사용됩니다.
  • std::sync::Arc: Rust의 멀티스레딩 환경에서 참조 카운팅을 통한 안전한 공유 메모리를 제공하는 구조체입니다.
    use 문을 수정한 코드에서 몇 가지 변화가 보입니다. 위에서 불필요한 모듈을 제거하고 필요한 모듈만 남기면서 코드가 좀 더 간결해졌습니다. 이 과정에서 발생할 수 있는 몇 가지 중요한 사항들을 확인해볼 수 있습니다.

변경된 use 문 분석

  • use rocksdb::{DB, Options};: RocksDB는 여전히 사용 중입니다. 이는 Raft 로그와 상태를 RocksDB에 저장하거나 읽어오는 데 사용됩니다.
  • use crate::{StableStorage, Result, Entry};: 여기서는 crate 내에 정의된 모듈들을 가져옵니다. StableStorage는 안정적인 저장소 인터페이스, Result는 에러 처리를 위한 타입, Entry는 Raft 로그 엔트리와 관련된 구조체입니다.
  • use std::sync::Arc;: 여전히 Arc는 멀티스레딩 환경에서 안전하게 데이터 공유를 위해 사용됩니다.
  • use raft::logger::Logger;: Logger는 Raft 프로토콜의 실행 과정과 이벤트를 기록하는 데 사용됩니다. 이는 디버깅 및 성능 모니터링에 중요합니다.

2. RocksDBStorage 생성 함수

impl RocksDBStorage {
    pub fn create(path: &str, logger: Arc<dyn Logger>) -> Result<Self> {
        let mut opts = Options::default();
        opts.create_if_missing(true);
        let db = DB::open(&opts, path).unwrap();
        Ok(RocksDBStorage { db, logger })
    }
}

DB::open(&opts, path).unwrap(): unwrap()은 Result 타입을 처리할 때 성공 시 값을 반환하지만, 실패 시 프로그램을 즉시 중단시키고 패닉을 일으킵니다. 이 방식은 간단하지만, 에러가 발생했을 때 프로그램이 종료될 수 있는 위험이 있습니다.

기존 코드에서 ? 연산자를 사용할 때 에러가 발생하고, unwrap()을 사용한 코드는 정상적으로 동작하는 이유는 주로 에러 타입 불일치에러 처리 방식에 있을 수 있습니다.

1) ? 연산자의 동작 원리

  • ?ResultOption 타입을 반환하는 함수에서 사용될 수 있습니다. 만약 DB::open(&opts, path)Result<T, E>를 반환할 때, 함수 전체에서 같은 Result 타입으로 에러를 처리할 수 있어야 합니다.
  • 즉, ?를 사용하는 함수에서는 반환 타입이 Result<T, E>여야 하고, 반환되는 에러 타입 E가 호출 함수와 일치해야 합니다. 그렇지 않으면 에러가 발생할 수 있습니다.

2) 에러가 발생한 이유

  • ? 연산자를 사용한 기존 코드에서 에러가 발생한 이유는 반환되는 에러 타입이 일치하지 않았을 가능성이 큽니다. 예를 들어, DB::open 함수에서 반환하는 에러 타입이 std::io::Error 또는 rocksdb::Error인데, 함수가 Result<Self, SomeOtherError> 타입을 반환하도록 설계되어 있다면 에러 타입 불일치로 인해 컴파일 에러가 발생할 수 있습니다.
  • 이 경우, 두 가지 해결 방법이 있습니다:
    1. 에러 타입을 변환: map_err를 사용하여 에러 타입을 호출 함수의 에러 타입에 맞게 변환할 수 있습니다.
    2. 에러 타입을 일치: 함수가 반환하는 Result 타입을 DB::open 함수의 에러 타입과 일치시키는 방법이 있습니다.

3) unwrap()이 정상적으로 동작하는 이유

  • unwrap()은 에러를 처리하는 대신, 에러가 발생하면 프로그램이 패닉 상태로 종료되도록 합니다. 따라서 에러 타입과는 상관없이 바로 값을 꺼낼 수 있기 때문에 컴파일 타임에서는 문제가 발생하지 않습니다.
  • 그러나, 이는 에러 발생 시 프로그램이 비정상적으로 종료될 수 있으므로 안전한 방법은 아닙니다.

3. StableStorage 트레이트 구현


impl StableStorage for RocksDBStorage {
    fn append(&mut self, entries: &[Entry]) -> Result<()> {
        // Logic to store logs in RocksDB
        for entry in entries {
            let key = format!("{:020}", entry.index); // Use index as key
            let value = entry.encode_to_vec();
            self.db.put(key, value).unwrap();
        }
        Ok(())
    }
    

네, entry.encode_to_vec() 부분이 기존의 entry.write_to_bytes()?에서 변경된 것도 중요한 차이점입니다. 이 변경 사항을 포함한 전체적인 차이점을 설명드리겠습니다.

1) 기존 코드 (write_to_bytes):

let value = entry.write_to_bytes()?;
  • write_to_bytes(): 이 함수는 prost::Message 트레이트에 있는 메서드로, 프로토콜 버퍼 메시지를 바이트 배열로 직렬화합니다. 직렬화 과정에서 에러가 발생할 수 있으므로 Result 타입을 반환하며, 에러가 발생하면 이를 상위 함수로 전파할 수 있습니다.
    • 장점: 에러 발생 시 적절히 처리할 수 있으며, 호출한 함수에서 에러를 인식하고 처리할 수 있습니다.
    • 에러 핸들링: 에러를 ?를 통해 전파하여 호출자에게 처리 기회를 제공합니다.

2) 변경된 코드 (encode_to_vec):

let value = entry.encode_to_vec();
  • encode_to_vec(): 이 메서드는 prost::Message의 또 다른 메서드로, 메시지를 바로 Vec<u8>로 직렬화하여 반환합니다. 이 함수는 에러를 반환하지 않으며 대신 직렬화를 시도한 후 결과를 Vec<u8>로 반환합니다.
    • 장점: 간단한 사용 방식으로, 에러를 따로 처리할 필요 없이 바로 직렬화된 데이터를 받을 수 있습니다.
    • 단점: 에러가 발생할 가능성이 있지만, 이 메서드는 에러를 반환하지 않기 때문에 직렬화 실패 시 이를 인지하거나 처리하기 어렵습니다. 따라서 내부적으로 직렬화가 실패하면 어떻게 처리되는지 명확하지 않습니다.

3) 차이점:

  • 에러 처리 방식:
    • write_to_bytes()는 직렬화 도중 에러가 발생할 수 있기 때문에, 이를 Result로 처리할 수 있도록 설계되어 있습니다. 이로 인해 에러 발생 시 코드에서 이를 처리할 기회를 제공받습니다.
    • encode_to_vec()는 에러가 발생하지 않는다고 가정하고, 바로 직렬화된 Vec<u8> 값을 반환하므로 에러 핸들링이 없습니다. 간결하지만, 직렬화 실패 시에 대한 보호 장치가 없는 상태입니다.
  • 안전성:
    • write_to_bytes()? 방식은 에러를 상위 함수로 전파할 수 있어 보다 안전합니다. 에러가 발생하면 즉시 프로그램이 종료되지 않고, 호출 함수에서 이를 처리할 수 있습니다.
    • encode_to_vec() 방식은 직렬화 실패 가능성에 대한 보호가 없기 때문에 안정성이 떨어질 수 있습니다. 특히 직렬화가 실패할 가능성이 있는 복잡한 데이터 구조를 다룰 때는 이 방식이 위험할 수 있습니다.

4) 권장 사항:

  • 안정성을 원한다면: 에러 처리가 중요한 경우 write_to_bytes()?를 사용하는 것이 더 안전합니다. 이 방식은 직렬화 실패 시 프로그램이 비정상적으로 종료되지 않고, 에러를 상위로 전파하여 처리할 수 있습니다.
  • 간결성을 원한다면: 만약 직렬화가 반드시 성공한다고 가정할 수 있는 상황에서는 encode_to_vec()를 사용할 수 있습니다. 이 경우 코드가 간결해지지만, 직렬화 실패 시 프로그램의 동작을 보장하기 어렵습니다.

4. 새롭게 추가된 함수

두 개의 새로운 함수가 추가되었습니다: set_hard_stateset_hard_state_commit. 각각의 함수는 Raft의 HardState를 저장하고 업데이트하는 기능을 제공합니다. 이 두 함수의 동작과 그 역할에 대해 설명드리겠습니다.

1) set_hard_state 함수

fn set_hard_state(&mut self, hard_state: &HardState) -> Result<()> {
    let value = hard_state.encode_to_vec();              // HardState를 직렬화하여 Vec<u8>로 변환
    self.db.put("hard_state", value).unwrap();           // RocksDB에 "hard_state" 키로 저장
    Ok(())
}

설명:

  • 이 함수는 HardState를 RocksDB에 저장하는 역할을 합니다.
  • 직렬화: hard_state.encode_to_vec()를 사용하여 HardState 객체를 바이너리 데이터(Vec<u8>)로 직렬화합니다.
  • 저장: self.db.put("hard_state", value)는 직렬화된 데이터를 "hard_state"라는 키를 사용해 RocksDB에 저장합니다.
  • 에러 처리에서 unwrap()을 사용하고 있으며, 만약 db.put()에서 에러가 발생하면 프로그램이 패닉에 빠집니다. 에러 전파가 필요하다면 ?로 바꾸는 것이 좋습니다.

2) set_hard_state_commit 함수

fn set_hard_state_commit(&mut self, commit: u64) -> Result<()> {
    let mut hard_state = self.hard_state()?;             // 현재 저장된 HardState를 가져옴
    hard_state.set_commit(commit);                       // commit 값을 업데이트
    self.set_hard_state(&hard_state)                     // 업데이트된 HardState를 다시 저장
}

설명:

  • 이 함수는 HardStatecommit 값을 업데이트하는 역할을 합니다.
  • 가져오기: self.hard_state()?를 호출하여 현재 저장된 HardState 객체를 가져옵니다. 이때 ?는 에러가 발생할 경우 에러를 호출한 쪽으로 전파합니다.
  • 업데이트: hard_state.set_commit(commit)를 호출하여 가져온 HardStatecommit 값을 수정합니다.
  • 저장: 수정된 HardStateself.set_hard_state(&hard_state)를 통해 다시 RocksDB에 저장됩니다.

두 함수의 관계:

  • set_hard_stateHardState 객체를 RocksDB에 직접 저장하는 함수입니다.
  • set_hard_state_commitHardStatecommit 값만 업데이트하고, 수정된 HardState를 다시 저장하는 함수입니다. 이 함수는 set_hard_state를 호출하여 변경된 상태를 저장하는 구조로 되어 있습니다.

전체적인 동작 요약:

  • set_hard_state: 전체 HardState를 저장하는 함수.
  • set_hard_state_commit: HardStatecommit 값을 수정하고 다시 저장하는 함수.

이 두 함수는 Raft 알고리즘에서 HardState와 그 commit 값을 관리하는 데 중요한 역할을 하며, 로그의 지속성과 상태 관리를 지원합니다.

5. 기타 메서드 - 로그 관리 및 압축

변경된 두 함수(compactall_entries)에서 주요한 변화는 compact 함수의 간소화all_entries 함수에서 에러 처리 로직을 강화한 부분입니다. 각 함수의 변화와 그 의미를 하나씩 살펴보겠습니다.

1) compact 함수 변경:

기존 코드:

fn compact(&mut self, index: u64) -> Result<()> {
    // Compress the data within a specific key range
    let start_key = format!("{:020}", 0);  // Start of the range
    let end_key = format!("{:020}", index);  // End of the range

    // Specify the range for compression in RocksDB
    self.db.compact_range(Some(&start_key), Some(&end_key));

    Ok(())
}

변경된 코드:

fn compact(&mut self, index: u64) -> Result<()> {
    // Compacting the log (removing old entries)
    // RocksDB handles this automatically, so special implementation might not be necessary.
    Ok(())
}

변화 분석:

  • 기존 코드에서는 compact_range를 사용해 특정 범위의 데이터를 압축(또는 정리)하는 기능을 구현했습니다. start_keyend_key 사이의 데이터를 RocksDB에서 압축하는 방식이었습니다.
  • 변경된 코드에서는 압축 작업을 생략하고, RocksDB가 이 작업을 자동으로 처리한다고 가정하고 있습니다. 즉, 별도의 구현 없이 바로 Ok(())를 반환하며 함수를 종료합니다.

의미:

  • 이 변경은 압축 로직의 단순화를 의미합니다. RocksDB는 내부적으로 데이터 관리를 위한 컴팩션(compaction)을 자동으로 수행할 수 있기 때문에 사용자 코드에서 명시적으로 호출하지 않아도 됩니다.

2) all_entries 함수 변경:

기존 코드:

fn all_entries(&self) -> raft::Result<Vec<Entry>> {
    let mut entries = Vec::new();
    let iter = self.db.iterator(rocksdb::IteratorMode::Start);

    for (key, value) in iter {
        let entry = Entry::decode(&*value)?;  // Decode entry and propagate error
        entries.push(entry);
    }

    Ok(entries)
}

변경된 코드:

fn all_entries(&self) -> crate::raft::Result<Vec<Entry>> {
    let mut entries = Vec::new();
    let iter = self.db.iterator(rocksdb::IteratorMode::Start);

    for item in iter {
        match item {
            Ok((_key, value)) => match Entry::decode(&*value) {
                Ok(entry) => entries.push(entry),
                Err(e) => {
                    return Err(crate::raft::Error::Store(crate::raft::StorageError::Other(
                        Box::new(e),
                    )))
                }
            },
            Err(e) => {
                return Err(crate::raft::Error::Store(crate::raft::StorageError::Other(
                    Box::new(e),
                )))
            }
        }
    }

    Ok(entries)
}

변화 분석:

  • 에러 처리 강화: 가장 큰 변화는 에러 처리 방식입니다. 기존 코드에서는 Entry::decode에서 발생하는 에러만 ? 연산자를 사용해 상위로 전파했습니다. 하지만 변경된 코드에서는 두 가지 경우에 대한 에러 처리를 명시적으로 구현했습니다:
    1. Iterator에서의 에러 처리: self.db.iterator()에서 발생하는 에러도 처리합니다. itemErr(e)인 경우 이를 명시적으로 처리하여 crate::raft::Error::Store로 래핑한 후 반환합니다.
    2. Entry::decode에서의 에러 처리: Entry::decode에서 발생하는 에러 역시 crate::raft::Error::Store로 래핑하여 처리합니다.
  • ? 연산자에서 명시적인 match 문 사용: 기존에는 ? 연산자로 에러를 전파했으나, 변경된 코드는 모든 에러 상황을 명시적인 match 문을 사용해 처리하며, 보다 명확하게 에러 처리를 수행합니다.

의미:

  • 이 변경은 에러 처리의 명시적 강화입니다. RocksDB에서 데이터를 읽을 때 발생할 수 있는 여러 에러 상황에 대해 구체적으로 핸들링하며, 프로그램이 예기치 않게 종료되지 않고 적절한 에러 처리를 하도록 설계되었습니다.

요약:

  1. compact 함수:
    • 기존 코드에서 특정 키 범위를 압축하는 기능을 제공했으나 변경된 코드에서는 RocksDB가 자동으로 컴팩션을 처리한다고 가정하고 로직을 제거했습니다. 이로 인해 함수는 단순히 Ok(())만 반환하는 구조로 바뀌었습니다.
  2. all_entries 함수:
    • 기존 코드에서는 RocksDB에서 데이터를 읽고 디코딩하는 과정에서 에러가 발생하면 이를 상위로 전파했습니다.
    • 변경된 코드에서는 명시적으로 에러를 처리하고, RocksDB의 iterator에서 발생하는 에러와 Entry::decode에서 발생하는 에러를 각각 명시적으로 핸들링하도록 강화되었습니다.

6. Storage 트레이트 구현

변경된 코드에서 crate::raft::Result로 수정한 부분은 모듈 경로를 명시적으로 지정한 것입니다. 이는 Rust에서의 모듈 경로 관리와 관련이 있습니다. crate를 사용한 변화에 대해 설명드리겠습니다.

1) 기존 코드 (raft::Result):

fn initial_state(&self) -> raft::Result<RaftState> {
  • 이 코드에서는 raft라는 모듈의 Result 타입을 사용하고 있습니다. 여기서 raft 모듈이 어디에 정의되어 있는지에 따라 해당 모듈을 찾게 됩니다.
  • 이 경우, Rust는 현재 스코프 내에서 raft 모듈을 탐색하며, 외부 크레이트나 내부 모듈일 수 있습니다.

2) 변경된 코드 (crate::raft::Result):

fn initial_state(&self) -> crate::raft::Result<RaftState> {
  • crate::를 사용함으로써, Result 타입이 현재 크레이트(현재 프로젝트)의 raft 모듈 내에 정의되어 있다는 것을 명확히 나타냅니다.
  • crate::현재 크레이트의 루트 모듈을 나타내므로, crate::raft::Result는 "현재 프로젝트에서 raft라는 모듈의 Result 타입"을 사용하겠다는 의미입니다.

3) 의미 변화:

  • 기존 코드에서의 raft::Result는 현재 모듈 또는 외부 라이브러리의 raft 모듈을 참조할 수 있었으며, 이로 인해 모호성이 있을 수 있습니다. 즉, 다른 외부 크레이트(예: raft 라이브러리)를 참조하고 있는지, 아니면 현재 프로젝트의 내부 모듈을 참조하고 있는지 불명확할 수 있습니다.
  • 변경된 코드에서는 crate::raft::Result로 명시하여, 이 Result 타입이 현재 프로젝트의 raft 모듈에서 가져온 것임을 확실히 했습니다. 이는 모듈 경로를 명확히 하여, 어디서 이 타입을 사용하는지 더 직관적으로 알 수 있습니다.

4) 변경의 의의:

  • 명확한 모듈 경로 지정: crate::를 사용하여 명확하게 현재 프로젝트 내의 raft 모듈을 참조하고 있다는 것을 드러냅니다. 이는 특히 프로젝트가 커지고, 여러 외부 라이브러리 또는 내부 모듈들이 복잡하게 얽혀 있을 때 매우 유용합니다.
  • 에러 방지: 만약 외부에서 동일한 이름의 모듈(예: raft)을 사용하고 있다면, crate::raft::Result로 명시적으로 내부 모듈을 참조함으로써 모호성을 피하고 잠재적인 충돌을 방지할 수 있습니다.

결론:

crate::raft::Result로 변경한 것은 현재 크레이트의 raft 모듈에서 Result 타입을 사용한다는 것을 명확히 하기 위한 것입니다. 이는 프로젝트의 모듈 구조를 명확하게 하고, 코드의 가독성을 높이면서, 모듈 간의 충돌이나 모호성을 줄이는 역할을 합니다.

7. Storage 트레이트 구현

1) entries 함수에서의 None 처리 변경

기존 코드:

if let Some(value) = self.db.get(key)? {
    let entry = Entry::decode(&*value)?;
    entries.push(entry);
}

변경된 코드:

match self.db.get(key).unwrap() {
    Some(value) => {
        let entry = Entry::decode(&*value).unwrap();
        entries.push(entry);
    }
    None => continue,
}

변화 설명:

  • 기존 코드에서는 None일 경우 아무 동작도 하지 않고 넘어갔습니다.
  • 변경된 코드에서는 None을 명시적으로 처리하고 continue로 루프를 넘어갑니다.
  • 기능적으로는 두 코드 모두 None일 때 루프를 넘어가는 점은 동일하지만, 변경된 코드에서는 None을 좀 더 명확하게 처리하고 있습니다.

2) first_indexlast_index 함수에서의 변경

기존 코드:

let iter = self.db.iterator(rocksdb::IteratorMode::Start);
match iter.next() {
    Some((key, _)) => {
        let idx = String::from_utf8(key.to_vec())?.parse::<u64>()?;
        Ok(idx)
    }
    None => Ok(0),
}

변경된 코드:

let mut iter = self.db.iterator(rocksdb::IteratorMode::Start);
match iter.next() {
    Some(Ok((key, _))) => {
        let idx = String::from_utf8(key.to_vec()).map_err(|_| {
            jopemachine_raft::Error::Store(crate::raft::StorageError::Unavailable)
        })?;
        let idx = idx.parse::<u64>().map_err(|_| {
            jopemachine_raft::Error::Store(crate::raft::StorageError::Unavailable)
        })?;
        Ok(idx)
    }
    Some(Err(_)) => Err(jopemachine_raft::Error::Store(
        crate::raft::StorageError::Unavailable,
    )),
    None => Ok(0),
}

변화 설명:

  • Iterator의 에러 처리:
    • 기존 코드에서는 iter.next()가 반환하는 값에 대해 에러 처리를 하지 않았습니다.
    • 변경된 코드에서는 iter.next()에서 에러가 발생할 경우 (Some(Err(_))) 명시적으로 에러를 처리하고 StorageError::Unavailable 에러를 반환합니다.
  • UTF-8 디코딩 및 파싱 오류 처리:
    • 기존 코드에서는 UTF-8 변환과 u64 파싱에서 발생하는 에러를 ? 연산자를 사용해 전파했습니다.
    • 변경된 코드에서는 map_err를 사용해 에러를 명시적으로 처리하고, 에러 발생 시 StorageError::Unavailable을 반환하도록 변경했습니다.

결론:

  1. entries 함수: None 값을 명시적으로 처리하여 코드의 가독성을 높였고, continue로 루프를 넘어가는 방식이 더 명확해졌습니다.
  2. first_indexlast_index 함수: Iterator에서 발생하는 에러와 UTF-8 변환 및 u64 파싱 에러에 대한 처리 방식을 강화하여, 에러 발생 시 명시적으로 처리하는 로직이 추가되었습니다.
profile
터널을 지나고 있을 뿐, 길은 여전히 열려 있다.

0개의 댓글