[에어비앤비] Mussel 대규모 분산 키-값 저장소의 비밀

궁금하면 500원·2025년 1월 9일

IT이야기

목록 보기
6/12

대규모 분산 키-값 저장소 구현하기

에어비앤비가 직면한 데이터 인프라의 도전과 Mussel을 통한 해결 과정은 현대 기술 기업이 마주하는 확장성 문제의 전형적인 사례를 보여줍니다.

이 분석에서는 단순한 기술 구현을 넘어, 비즈니스 요구사항이 어떻게 아키텍처 결정에 영향을 미쳤는지, 그리고 그 과정에서 얻은 실용적인 교훈들을 공유하고자 합니다.

🗃️ 에어비앤비의 데이터 의존성

에어비앤비는 원시 데이터뿐만 아니라, 사용자 경험을 개인화하는 파생 데이터에 의존합니다.
이 데이터는 대량의 오프라인 정보로, 개인 맞춤 서비스를 제공하는 데 필수적입니다.

🛠️ Mussel 아키텍처와 성능

Mussel은 HFileService와 Nebula의 한계를 극복하고, Apache Helix와 Kafka를 활용해 높은 가용성과 효율성을 제공합니다.
초당 800,000회의 쿼리를 처리하며, 평균 읽기 지연 시간은 8밀리초 미만입니다.

📊 Mussel의 데이터 처리 혁신

Mussel은 실시간과 배치 데이터를 통합하여 데이터 관리의 효율성을 크게 향상시켰습니다.
이를 통해 에어비앤비의 데이터 인프라는 130TB 이상을 처리하고, 99.9% 이상의 가용성을 유지합니다.
이 아티클은 Mussel을 통해 에어비앤비가 직면한 데이터 관리 문제를 해결하며 확장성, 낮은 지연 시간, 운영 복잡성을 해결하는 강력한 키-값 저장소의 진화를 보여줍니다.

1. Apache Helix를 활용한 고가용성 클러스터 관리 구현

1.1 클러스터 구성 관리자 구현

@Slf4j
public class MusselClusterManager {
    private final HelixManager helixManager;
    private final HelixAdmin helixAdmin;
    private final String clusterName;
    
    public MusselClusterManager(String zkConnectString, String clusterName) {
        this.clusterName = clusterName;
        this.helixManager = HelixManagerFactory.getZKHelixManager(
            clusterName,
            "CONTROLLER",
            InstanceType.CONTROLLER,
            zkConnectString
        );
        this.helixAdmin = new ZKHelixAdmin(zkConnectString);
        
        initializeCluster();
    }
    
    private void initializeCluster() {
        // 클러스터가 존재하지 않으면 생성
        if (!helixAdmin.getClusters().contains(clusterName)) {
            helixAdmin.addCluster(clusterName);
            
            // 상태 모델 정의
            StateModelDefinition stateModel = defineStateModel();
            helixAdmin.addStateModelDef(clusterName, "OnlineOffline", stateModel);
            
            // 리소스 설정 (1024 파티션)
            int numPartitions = 1024;
            int numReplicas = 3;
            
            helixAdmin.addResource(
                clusterName,
                "MusselResource",
                numPartitions,
                "OnlineOffline",
                "FULL_AUTO"
            );
        }
    }
    
    private StateModelDefinition defineStateModel() {
        StateModelDefinitionBuilder builder = new StateModelDefinitionBuilder("OnlineOffline");
        
        // 상태 정의
        builder.addState("ONLINE", 1);
        builder.addState("OFFLINE");
        builder.addState("DROPPED");
        
        // 전이 정의
        builder.addTransition("OFFLINE", "ONLINE", 1);
        builder.addTransition("ONLINE", "OFFLINE", 2);
        builder.addTransition("OFFLINE", "DROPPED", 3);
        
        return builder.build();
    }
}

1.2 샤드 재분배 관리자 구현

@Slf4j
public class ShardRebalanceManager {
    private final MusselClusterManager clusterManager;
    private final LoadBalancer loadBalancer;
    
    public void rebalanceCluster() {
        Map<String, List<String>> currentAssignment = getCurrentAssignment();
        Map<String, Integer> nodeCounts = getNodeLoadCounts();
        
        // 노드당 부하 체크
        double avgLoad = calculateAverageLoad(nodeCounts);
        List<String> overloadedNodes = findOverloadedNodes(nodeCounts, avgLoad);
        
        if (!overloadedNodes.isEmpty()) {
            rebalanceShards(overloadedNodes, currentAssignment);
        }
    }
    
    private void rebalanceShards(
        List<String> overloadedNodes,
        Map<String, List<String>> currentAssignment
    ) {
        for (String node : overloadedNodes) {
            List<String> shards = currentAssignment.get(node);
            int shardsToMove = calculateShardsToMove(shards.size());
            
            for (int i = 0; i < shardsToMove; i++) {
                String shard = shards.get(i);
                String targetNode = findLeastLoadedNode();
                
                try {
                    moveShardToNode(shard, node, targetNode);
                    log.info("Moved shard {} from {} to {}", shard, node, targetNode);
                } catch (Exception e) {
                    log.error("Failed to move shard: " + e.getMessage());
                }
            }
        }
    }
}

2. Kafka 기반의 리더리스 복제 메커니즘 상세 분석

2.1 Kafka 프로듀서 구현

@Slf4j
public class MusselKafkaProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final String topic;
    
    public MusselKafkaProducer(Properties config, String topic) {
        this.producer = new KafkaProducer<>(getKafkaConfig(config));
        this.topic = topic;
    }
    
    private Properties getKafkaConfig(Properties config) {
        Properties kafkaConfig = new Properties();
        kafkaConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                       config.getProperty("kafka.bootstrap.servers"));
        kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class.getName());
        kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                       ByteArraySerializer.class.getName());
        kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all");
        kafkaConfig.put(ProducerConfig.RETRIES_CONFIG, 3);
        kafkaConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        
        return kafkaConfig;
    }
    
    public CompletableFuture<RecordMetadata> writeData(
        String key, 
        byte[] value, 
        long timestamp
    ) {
        ProducerRecord<String, byte[]> record = 
            new ProducerRecord<>(topic, null, timestamp, key, value);
            
        CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
        
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                future.completeExceptionally(exception);
            } else {
                future.complete(metadata);
            }
        });
        
        return future;
    }
}

2.2 리더리스 복제 관리자 구현

public class LeaderlessReplicationManager {
    private final MusselKafkaProducer producer;
    private final List<MusselKafkaConsumer> consumers;
    private final ConcurrentMap<String, CompletableFuture<Void>> pendingWrites;
    
    public CompletableFuture<Void> write(String key, byte[] value) {
        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
        pendingWrites.put(key, writeFuture);
        
        // Kafka에 쓰기
        producer.writeData(key, value, System.currentTimeMillis())
            .thenAccept(metadata -> {
                // 복제 확인 대기
                waitForReplication(key, metadata.offset())
                    .thenRun(() -> {
                        pendingWrites.remove(key);
                        writeFuture.complete(null);
                    })
                    .exceptionally(e -> {
                        writeFuture.completeExceptionally(e);
                        return null;
                    });
            })
            .exceptionally(e -> {
                writeFuture.completeExceptionally(e);
                return null;
            });
            
        return writeFuture;
    }
    
    private CompletableFuture<Void> waitForReplication(String key, long offset) {
        int requiredAcks = consumers.size() / 2 + 1; // 과반수 확인
        AtomicInteger acks = new AtomicInteger(0);
        CompletableFuture<Void> future = new CompletableFuture<>();
        
        // 복제 확인 타임아웃 설정
        scheduleTimeout(future, key, 5000);
        
        for (MusselKafkaConsumer consumer : consumers) {
            consumer.confirmReplication(offset)
                .thenRun(() -> {
                    if (acks.incrementAndGet() >= requiredAcks) {
                        future.complete(null);
                    }
                })
                .exceptionally(e -> {
                    future.completeExceptionally(e);
                    return null;
                });
        }
        
        return future;
    }
}

3. HRegion 기반 스토리지 엔진의 최적화 전략

3.1 LSM 트리 구현

public class LSMTree {
    private final MemTable memTable;
    private final List<SSTable> ssTables;
    private final BloomFilter bloomFilter;
    private final CompactionManager compactionManager;
    
    public void put(byte[] key, byte[] value) {
        // 메모리 테이블이 가득 찼는지 확인
        if (memTable.isFull()) {
            flushMemTable();
        }
        
        // 메모리 테이블에 쓰기
        memTable.put(key, value);
        bloomFilter.add(key);
    }
    
    public byte[] get(byte[] key) {
        // Bloom 필터로 빠른 네거티브 체크
        if (!bloomFilter.mightContain(key)) {
            return null;
        }
        
        // 메모리 테이블 먼저 확인
        byte[] value = memTable.get(key);
        if (value != null) {
            return value;
        }
        
        // SS 테이블 계층 순회
        for (SSTable ssTable : ssTables) {
            value = ssTable.get(key);
            if (value != null) {
                return value;
            }
        }
        
        return null;
    }
    
    private void flushMemTable() {
        SSTable newSSTable = memTable.flush();
        ssTables.add(newSSTable);
        memTable.clear();
        
        // 백그라운드에서 컴팩션 수행
        compactionManager.scheduleCompaction(ssTables);
    }
}

3.2 블록 캐시 관리자 구현

public class BlockCache {
    private final LoadingCache<BlockId, byte[]> cache;
    
    public BlockCache(long maxBytes) {
        this.cache = Caffeine.newBuilder()
            .maximumWeight(maxBytes)
            .weigher((BlockId key, byte[] value) -> value.length)
            .recordStats()
            .build(this::loadBlock);
    }
    
    private byte[] loadBlock(BlockId blockId) throws IOException {
        try (FileChannel channel = FileChannel.open(blockId.getFilePath())) {
            ByteBuffer buffer = ByteBuffer.allocate(blockId.getSize());
            channel.position(blockId.getOffset());
            channel.read(buffer);
            return buffer.array();
        }
    }
    
    public byte[] getBlock(BlockId blockId) {
        try {
            return cache.get(blockId);
        } catch (Exception e) {
            log.error("Failed to load block: " + blockId, e);
            return null;
        }
    }
    
    public CacheStats getStats() {
        return cache.stats();
    }
}

4. 실시간 데이터 처리와 배치 처리의 통합 구현

4.1 통합 데이터 처리 매니저

@Slf4j
public class IntegratedDataManager {
    private final LSMTree storage;
    private final KafkaConsumer<String, byte[]> realTimeConsumer;
    private final SparkSession sparkSession;
    private final BlockingQueue<DataEvent> eventQueue;
    
    public void processRealTimeData() {
        while (true) {
            ConsumerRecords<String, byte[]> records = 
                realTimeConsumer.poll(Duration.ofMillis(100));
                
            for (ConsumerRecord<String, byte[]> record : records) {
                DataEvent event = DataEvent.fromKafkaRecord(record);
                eventQueue.put(event);
            }
        }
    }
    
    public void processBatchData(String sourcePath) {
        Dataset<Row> batchData = sparkSession.read()
            .format("parquet")
            .load(sourcePath);
            
        // 증분 데이터만 처리
        Dataset<Row> deltaData = batchData
            .filter(col("timestamp").gt(lit(getLastProcessedTimestamp())));
            
        deltaData.foreachPartition(partition -> {
            while (partition.hasNext()) {
                Row row = partition.next();
                DataEvent event = DataEvent.fromSparkRow(row);
                eventQueue.put(event);
            }
        });
    }
    
    private class DataProcessor implements Runnable {
        public void run() {
            while (true) {
                DataEvent event = eventQueue.take();
                try {
                    storage.put(event.getKey(), event.getValue());
                } catch (Exception e) {
                    log.error("Failed to process event: " + event, e);
                }
            }
        }
    }
}

4.2 데이터 일관성 관리자

public class ConsistencyManager {
    private final ConcurrentNavigableMap<Long, Set<String>> timeIndex;
    private final ReadWriteLock lock;
    
    public void recordWrite(String key, long timestamp) {
        lock.writeLock().lock();
        try {
            timeIndex.computeIfAbsent(timestamp, k -> new ConcurrentHashSet<>())
                .add(key);
        } finally {
            lock.writeLock().unlock();
        }
    }
    
    public Map<String, byte[]> readConsistent(long timestamp) {
        lock.readLock().lock();
        try {
            Map<String, byte[]> result = new HashMap<>();
            
            // timestamp 이전의 모든 쓰기 수집
            NavigableMap<Long, Set<String>> previous = 
                timeIndex.headMap(timestamp, true);
                for (Map.Entry<Long, Set<String>> entry : previous.entrySet()) {
                for (String key : entry.getValue()) {
                    // 가장 최근 값만 유지
                    if (!result.containsKey(key)) {
                        result.put(key, storage.get(key));
                    }
                }
            }
            
            return result;
        } finally {
            lock.readLock().unlock();
        }
    }
    
    public void cleanupOldEntries(long retentionPeriod) {
        long cutoffTime = System.currentTimeMillis() - retentionPeriod;
        
        lock.writeLock().lock();
        try {
            timeIndex.headMap(cutoffTime).clear();
        } finally {
            lock.writeLock().unlock();
        }
    }
}

5. 성능 모니터링 및 장애 복구 시스템

5.1 메트릭 수집기 구현

@Slf4j
public class MetricsCollector {
    private final MeterRegistry registry;
    private final Map<String, Timer> operationTimers;
    private final Map<String, Counter> errorCounters;
    
    public MetricsCollector() {
        this.registry = new SimpleMeterRegistry();
        this.operationTimers = new ConcurrentHashMap<>();
        this.errorCounters = new ConcurrentHashMap<>();
        
        // 기본 메트릭 초기화
        initializeMetrics();
    }
    
    private void initializeMetrics() {
        // 작업 타이머
        operationTimers.put("read", Timer.builder("mussel.operation.read")
            .description("Read operation latency")
            .register(registry));
            
        operationTimers.put("write", Timer.builder("mussel.operation.write")
            .description("Write operation latency")
            .register(registry));
            
        // 에러 카운터
        errorCounters.put("read_error", Counter.builder("mussel.error.read")
            .description("Read operation errors")
            .register(registry));
            
        errorCounters.put("write_error", Counter.builder("mussel.error.write")
            .description("Write operation errors")
            .register(registry));
    }
    
    public void recordOperationLatency(String operation, long startTime) {
        Timer timer = operationTimers.get(operation);
        if (timer != null) {
            timer.record(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
        }
    }
    
    public void incrementErrorCount(String operation) {
        Counter counter = errorCounters.get(operation + "_error");
        if (counter != null) {
            counter.increment();
        }
    }
    
    public MetricsSnapshot getMetricsSnapshot() {
        return new MetricsSnapshot(
            calculateAverageLatencies(),
            getErrorCounts(),
            registry.getMeters().stream()
                .collect(Collectors.toMap(
                    Meter::getId,
                    meter -> meter.measure().stream()
                        .mapToDouble(Measurement::getValue)
                        .sum()
                ))
        );
    }
}

5.2 장애 복구 관리자 구현

@Slf4j
public class FailoverManager {
    private final MusselClusterManager clusterManager;
    private final MetricsCollector metricsCollector;
    private final Map<String, NodeHealth> nodeHealthStatus;
    
    @Data
    @AllArgsConstructor
    private static class NodeHealth {
        private boolean isHealthy;
        private long lastHeartbeat;
        private int consecutiveFailures;
    }
    
    public void monitorNodeHealth() {
        while (true) {
            for (String nodeId : clusterManager.getActiveNodes()) {
                try {
                    checkNodeHealth(nodeId);
                } catch (Exception e) {
                    handleNodeFailure(nodeId, e);
                }
            }
            Thread.sleep(healthCheckInterval);
        }
    }
    
    private void checkNodeHealth(String nodeId) {
        NodeHealth health = nodeHealthStatus.get(nodeId);
        
        // 노드 메트릭 수집
        MetricsSnapshot metrics = metricsCollector.getMetricsSnapshot();
        
        // 상태 체크
        boolean isHealthy = isNodeHealthy(metrics);
        
        if (isHealthy) {
            health.setConsecutiveFailures(0);
            health.setLastHeartbeat(System.currentTimeMillis());
        } else {
            health.setConsecutiveFailures(health.getConsecutiveFailures() + 1);
            
            if (health.getConsecutiveFailures() >= failureThreshold) {
                initiateFailover(nodeId);
            }
        }
    }
    
    private void initiateFailover(String nodeId) {
        log.warn("Initiating failover for node: {}", nodeId);
        
        try {
            // 1. 노드를 클러스터에서 제거
            clusterManager.removeNode(nodeId);
            
            // 2. 샤드 재배치 시작
            redistributeShards(nodeId);
            
            // 3. 새 노드 프로비저닝 요청
            requestNewNode();
            
        } catch (Exception e) {
            log.error("Failover failed for node: " + nodeId, e);
            // 수동 개입 알림 발송
            alertOperators(nodeId, e);
        }
    }
    
    private void redistributeShards(String failedNodeId) {
        List<String> shardsToRedistribute = 
            clusterManager.getShardsForNode(failedNodeId);
            
        Map<String, Integer> nodeLoads = 
            clusterManager.getNodeLoads();
            
        for (String shardId : shardsToRedistribute) {
            String targetNode = findLeastLoadedNode(nodeLoads);
            clusterManager.moveShard(shardId, targetNode);
            nodeLoads.merge(targetNode, 1, Integer::sum);
        }
    }
}

5.3 성능 최적화 도구

public class PerformanceOptimizer {
    private final BlockCache blockCache;
    private final CompactionManager compactionManager;
    private final MetricsCollector metricsCollector;
    
    public void optimize() {
        // 캐시 히트율 분석
        analyzeCachePerformance();
        
        // 컴팩션 스케줄 최적화
        optimizeCompaction();
        
        // 메모리 사용량 최적화
        optimizeMemoryUsage();
    }
    
    private void analyzeCachePerformance() {
        CacheStats stats = blockCache.getStats();
        double hitRate = stats.hitRate();
        
        if (hitRate < 0.8) { // 히트율 80% 미만
            // 캐시 크기 조정
            long currentSize = blockCache.getMaxSize();
            long newSize = currentSize * 12 / 10; // 20% 증가
            blockCache.resize(newSize);
            
            // 캐시 워밍업
            warmupCache();
        }
    }
    
    private void optimizeCompaction() {
        MetricsSnapshot metrics = metricsCollector.getMetricsSnapshot();
        double readLatency = metrics.getAverageLatency("read");
        
        if (readLatency > 10.0) { // 10ms 이상
            // 컴팩션 임계값 조정
            compactionManager.adjustThreshold(0.7); // 더 공격적인 컴팩션
            
            // 우선순위가 높은 컴팩션 작업 스케줄링
            compactionManager.scheduleUrgentCompaction();
        }
    }
}

6. 운영 및 모니터링 인사이트

6.1 실제 운영 시 주요 고려사항

1. 데이터 일관성 관리

  • 리더리스 복제로 인한 일시적 불일치 허용
  • 최종 일관성 달성을 위한 백그라운드 조정
  • 버전 관리를 통한 충돌 해결

2. 성능 최적화

  • LSM 트리 컴팩션 전략
  • 블록 캐시 크기 조정
  • 메모리 테이블 플러시 타이밍

3. 장애 대응

  • 노드 실패 시 자동 복구
  • 데이터 재조정 전략
  • 모니터링 및 알림 설정

6.2 모니터링 지표

1. 시스템 건강도

  • CPU, 메모리, 디스크 사용률
  • 네트워크 대역폭
  • 오픈 파일 디스크립터

2. 성능 지표

  • 읽기/쓰기 지연시간 (P95, P99)
  • 초당 처리량
  • 캐시 히트율

3. 데이터 품질

  • 복제 지연
  • 불일치 발생률
  • 데이터 손실 여부

결론

Mussel 아키텍처는 대규모 분산 시스템에서 발생하는 다양한 도전 과제들을 효과적으로 해결하는 방법을 보여줍니다.

1. Apache Helix를 활용한 클러스터 관리로 안정적인 운영

2. Kafka 기반 리더리스 복제로 고가용성 확보

3. LSM 트리 기반 스토리지로 효율적인 데이터 관리

4. 실시간/배치 처리의 효과적인 통합

5. 강력한 모니터링 및 장애 복구 시스템

출처

profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글