Anti-entropy mechanisms

Migo·2024년 11월 20일

Entropy?

Entropy indicates the measure of disorder in the system. Applying to a distributed system, it means a degree of state divergence between the participating nodes.

Anti-entropy

Now, anti-entropy is a way to bring the drifted nodes back to normal when delivery failed. It spawns a new process(either fore or back) that compares and reconciles conflicting records.

Anti-entropy mechanisms

  • Hinted handoff
  • Read-repair
  • Merkle trees
  • Dotted version vectors

Each approach optimizes for one of three parameters:
1) scope reduction
2) recency
3) completeness

For example, it'd be much cheaper if you narrow down the scope of queries by synchronizing only the data that is being actively queried with read-repairs.

If system assumes that most failures are temporary, we can store the log of the recent diverged events and synchronize them layer using dotted version vectors.

If the entire dataset comparison is required, we can hash the data and compare hashes(Merkle trees)

Example in Rust (Dotted Version Vector)

Scenario

Imagine a distributed key-value store with three replicas: A, B, and C. Each replica can independently modify a key, and the dotted version vector is used to resolve conflicts when synchronizing.

Initial State

Key: x
Initial value: 10
Dotted Version Vector: {} (empty, no updates yet)

Replica A increments the value

Replica A updates x to 11.
It increments its counter in the version vector and creates a dot for this update.
New state at A:
Value: 11
Dotted Version Vector: {A: 1}

Replica B decrements the value

Replica B updates x to 9.
It increments its counter in the version vector and creates a dot for this update.
New state at B:
Value: 9
Dotted Version Vector: {B: 1}

Replica A and B sync with conflicts

During synchronization, A and B exchange their updates.
Both replicas note that their dots are distinct and not causally related (no dependency or overwrite).

Merged state at A and B:
Conflict exists: {A: 1, B: 1}
Values: [11, 9] (both updates retained for conflict resolution)

Replica C updates the value independently

Replica C updates x to 15 and increments its version vector.
New state at C:
Value: 15
Dotted Version Vector: {C: 1}

Replica A syncs with C

A merges its state with C.
Now, the combined version vector reflects all updates: {A: 1, B: 1, C: 1}.
Conflicts can now be resolved (e.g., using application-specific rules or last-write-wins).

Final merged state at A and C:
Conflict values: [11, 9, 15]
Dotted Version Vector: {A: 1, B: 1, C: 1}

Code

use std::collections::{HashMap, HashSet};

#[derive(Debug, Clone, Eq, PartialEq)]
struct DottedVersionVector {
    vector: HashMap<String, u64>, // Replica ID -> Counter
    dots: HashSet<(String, u64)>, // Set of individual updates (Replica ID, Counter)
}

impl DottedVersionVector {
    fn new() -> Self {
        Self {
            vector: HashMap::new(),
            dots: HashSet::new(),
        }
    }

    /// Increment the counter for a given replica and record a dot.
    fn increment(&mut self, replica_id: &str) {
        let counter = self.vector.entry(replica_id.to_string()).or_insert(0);
        *counter += 1;
        self.dots.insert((replica_id.to_string(), *counter));
    }

    /// Merge another version vector into this one.
    fn merge(&mut self, other: &Self) {
        // Merge the vector (max of counters)
        for (replica_id, &counter) in &other.vector {
            let entry = self.vector.entry(replica_id.clone()).or_insert(0);
            *entry = (*entry).max(counter);
        }

        // Merge the dots
        self.dots.extend(other.dots.clone());
    }
}

#[derive(Debug, Clone)]
struct Replica {
    id: String,
    value: i32,
    dvv: DottedVersionVector,
}

impl Replica {
    fn new(id: &str, initial_value: i32) -> Self {
        Self {
            id: id.to_string(),
            value: initial_value,
            dvv: DottedVersionVector::new(),
        }
    }

    /// Update the value and increment the dotted version vector.
    fn update(&mut self, delta: i32) {
        self.value += delta;
        self.dvv.increment(&self.id);
    }

    /// Sync with another replica, merging their state.
    fn sync(&mut self, other: &mut Self) {
        self.dvv.merge(&other.dvv);
        other.dvv.merge(&self.dvv);

        // Conflict resolution strategy (e.g., retain all conflicting values)
        println!(
            "Syncing replicas {} and {}. Conflicts must be resolved manually.",
            self.id, other.id
        );
    }
}


#[test]
fn test() {
// Initialize replicas
let mut replica_a = Replica::new("A", 10);
let mut replica_b = Replica::new("B", 10);
let mut replica_c = Replica::new("C", 10);

// Replica A increments the value
replica_a.update(1); // New value: 11
assert!(replica_a.value == 11);
assert_eq!(
        replica_a.dvv,
        DottedVersionVector {
            vector: [("A".to_string(), 1)].into_iter().collect(),
            dots: [("A".to_string(), 1)].into_iter().collect(),
        }
    );

// Replica B decrements the value
replica_b.update(-1); // New value: 9
assert!(replica_b.value == 9);
assert_eq!(
        replica_b.dvv,
        DottedVersionVector {
            vector: [("B".to_string(), 1)].into_iter().collect(),
            dots: [("B".to_string(), 1)].into_iter().collect(),
        }
    );

// A and B sync (causing a conflict)
replica_a.sync(&mut replica_b);
assert_eq!(
        replica_a.dvv,
        DottedVersionVector {
            vector: [("A".to_string(), 1), ("B".to_string(), 1)]
                .into_iter()
                .collect(),
            dots: [("A".to_string(), 1), ("B".to_string(), 1)]
                .into_iter()
                .collect(),
        }
    );

// Replica C independently updates the value
replica_c.update(5); // New value: 15
assert_eq!(
        replica_c.dvv,
        DottedVersionVector {
            vector: [("C".to_string(), 1)].into_iter().collect(),
            dots: [("C".to_string(), 1)].into_iter().collect(),
        }
    );

// A and C sync
replica_a.sync(&mut replica_c);
assert_eq!(
        replica_a.dvv,
        DottedVersionVector {
            vector: [
                ("A".to_string(), 1),
                ("B".to_string(), 1),
                ("C".to_string(), 1)
            ]
            .into_iter()
            .collect(),
            dots: [
                ("A".to_string(), 1),
                ("B".to_string(), 1),
                ("C".to_string(), 1)
            ]
            .into_iter()
            .collect(),
        }
    );
}
profile
Dude with existential crisis

0개의 댓글