Chronicle Map & Chronicle Queue

방지환·2026년 1월 22일

Java

목록 보기
18/19

개요

Chronicle 시리즈는 OpenHFT(High Frequency Trading)에서 개발한 오픈소스 라이브러리로, 초저지연(ultra-low latency)이 필요한 금융권, 실시간 시스템에서 널리 사용됩니다.


Chronicle Map

특징

  • Off-Heap Memory 기반 Key-Value 저장소
  • Memory-Mapped File 사용으로 초고속 I/O
  • JVM 힙 메모리 독립적: GC의 영향을 받지 않음
  • 프로세스 간 공유 가능: 여러 JVM 프로세스가 동일한 맵을 공유
  • 트랜잭션 없음: 단순하고 빠른 구조

사용 사례

  • 고성능 캐시 시스템
  • 세션 저장소
  • 실시간 가격 정보 저장
  • 메타데이터 저장소
  • IPC(Inter-Process Communication)

Chronicle Map 설치

Maven

<dependency>
    <groupId>net.openhft</groupId>
    <artifactId>chronicle-map</artifactId>
    <version>3.24ea3</version>
</dependency>

Gradle

implementation 'net.openhft:chronicle-map:3.24ea3'

Chronicle Map 기본 사용법

1. 기본 생성 및 사용

import net.openhft.chronicle.map.ChronicleMap;
import java.io.File;
import java.io.IOException;

public class ChronicleMapBasicExample {
    public static void main(String[] args) throws IOException {
        File file = new File("user-map.dat");
        
        // Chronicle Map 생성
        try (ChronicleMap<String, String> map = ChronicleMap
                .of(String.class, String.class)
                .name("user-cache")
                .entries(1_000_000)              // 예상 엔트리 수
                .averageKeySize(20)               // 평균 키 크기 (bytes)
                .averageValueSize(100)            // 평균 값 크기 (bytes)
                .createPersistedTo(file)) {       // 파일에 영구 저장
            
            // 데이터 쓰기
            map.put("user:1001", "홍길동");
            map.put("user:1002", "김철수");
            map.put("user:1003", "이영희");
            
            // 데이터 읽기
            String user = map.get("user:1001");
            System.out.println("User: " + user);
            
            // 존재 여부 확인
            if (map.containsKey("user:1002")) {
                System.out.println("사용자 존재함");
            }
            
            // 삭제
            map.remove("user:1003");
            
            // 크기 확인
            System.out.println("Total entries: " + map.size());
        }
    }
}

2. 복합 객체 저장

import net.openhft.chronicle.map.ChronicleMap;
import net.openhft.chronicle.core.values.LongValue;
import net.openhft.chronicle.values.Values;

import java.io.File;
import java.io.Serializable;

// Value 객체 정의
class User implements Serializable {
    private static final long serialVersionUID = 1L;
    
    private String name;
    private int age;
    private String email;
    
    // 생성자, getter, setter 생략
    
    public User(String name, int age, String email) {
        this.name = name;
        this.age = age;
        this.email = email;
    }
    
    // toString, equals, hashCode 구현 필요
}

public class ChronicleMapObjectExample {
    public static void main(String[] args) throws Exception {
        File file = new File("user-object-map.dat");
        
        try (ChronicleMap<Integer, User> map = ChronicleMap
                .of(Integer.class, User.class)
                .name("user-object-map")
                .entries(100_000)
                .averageValue(new User("Sample", 30, "sample@example.com"))
                .createPersistedTo(file)) {
            
            // 객체 저장
            map.put(1001, new User("홍길동", 25, "hong@example.com"));
            map.put(1002, new User("김철수", 30, "kim@example.com"));
            
            // 객체 조회
            User user = map.get(1001);
            System.out.println("User: " + user.name + ", Age: " + user.age);
        }
    }
}

3. 프로세스 간 공유

import net.openhft.chronicle.map.ChronicleMap;
import java.io.File;

// 프로세스 1: Writer
public class MapWriter {
    public static void main(String[] args) throws Exception {
        File file = new File("/tmp/shared-map.dat");
        
        try (ChronicleMap<String, String> map = ChronicleMap
                .of(String.class, String.class)
                .entries(10_000)
                .averageKeySize(10)
                .averageValueSize(50)
                .createPersistedTo(file)) {
            
            int counter = 0;
            while (true) {
                map.put("counter", String.valueOf(counter++));
                System.out.println("Written: " + counter);
                Thread.sleep(1000);
            }
        }
    }
}

// 프로세스 2: Reader
public class MapReader {
    public static void main(String[] args) throws Exception {
        File file = new File("/tmp/shared-map.dat");
        
        try (ChronicleMap<String, String> map = ChronicleMap
                .of(String.class, String.class)
                .entries(10_000)
                .averageKeySize(10)
                .averageValueSize(50)
                .createPersistedTo(file)) {
            
            while (true) {
                String value = map.get("counter");
                System.out.println("Read: " + value);
                Thread.sleep(1000);
            }
        }
    }
}

Chronicle Queue

특징

  • 초저지연 메시지 큐: 마이크로초 단위 지연시간
  • 영구 저장소: 모든 메시지가 디스크에 저장됨
  • 순차 쓰기 최적화: 로그 구조로 매우 빠른 쓰기 성능
  • 메모리 효율적: Off-heap 메모리 사용
  • Tailer/Appender 패턴: Producer/Consumer 모델
  • 무손실: 시스템 장애 시에도 데이터 유실 없음

사용 사례

  • 고성능 이벤트 소싱
  • 트레이딩 시스템 주문 처리
  • 로그 수집 및 처리
  • 마이크로서비스 간 통신
  • 감사(Audit) 로그

Chronicle Queue 설치

Maven

<dependency>
    <groupId>net.openhft</groupId>
    <artifactId>chronicle-queue</artifactId>
    <version>5.24ea17</version>
</dependency>

Gradle

implementation 'net.openhft:chronicle-queue:5.24ea17'

Chronicle Queue 기본 사용법

1. 기본 Producer/Consumer

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;

import java.io.File;

public class ChronicleQueueBasicExample {
    
    // Producer
    public static void produceMessages() {
        String path = "/tmp/chronicle-queue";
        
        try (ChronicleQueue queue = SingleChronicleQueueBuilder
                .binary(path)
                .build()) {
            
            ExcerptAppender appender = queue.acquireAppender();
            
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                appender.writeText(message);
                System.out.println("Produced: " + message);
            }
        }
    }
    
    // Consumer
    public static void consumeMessages() {
        String path = "/tmp/chronicle-queue";
        
        try (ChronicleQueue queue = SingleChronicleQueueBuilder
                .binary(path)
                .build()) {
            
            ExcerptTailer tailer = queue.createTailer();
            
            while (true) {
                String message = tailer.readText();
                if (message == null) {
                    break; // 더 이상 메시지가 없음
                }
                System.out.println("Consumed: " + message);
            }
        }
    }
    
    public static void main(String[] args) {
        produceMessages();
        consumeMessages();
    }
}

2. 구조화된 메시지 처리

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;

// 메시지 인터페이스 정의
interface OrderEvent {
    void orderId(long orderId);
    long orderId();
    
    void symbol(String symbol);
    String symbol();
    
    void quantity(int quantity);
    int quantity();
    
    void price(double price);
    double price();
}

public class ChronicleQueueStructuredExample {
    
    public static void main(String[] args) {
        String path = "/tmp/order-queue";
        
        // Producer
        try (ChronicleQueue queue = SingleChronicleQueueBuilder
                .binary(path)
                .build()) {
            
            ExcerptAppender appender = queue.acquireAppender();
            
            // 주문 이벤트 쓰기
            try (DocumentContext dc = appender.writingDocument()) {
                OrderEvent event = dc.wire().getValueOut().object(OrderEvent.class);
                event.orderId(12345);
                event.symbol("AAPL");
                event.quantity(100);
                event.price(150.50);
            }
            
            System.out.println("Order event written");
        }
        
        // Consumer
        try (ChronicleQueue queue = SingleChronicleQueueBuilder
                .binary(path)
                .build()) {
            
            ExcerptTailer tailer = queue.createTailer();
            
            // 주문 이벤트 읽기
            try (DocumentContext dc = tailer.readingDocument()) {
                if (dc.isPresent()) {
                    OrderEvent event = dc.wire().getValueIn().object(OrderEvent.class);
                    System.out.printf("Order: ID=%d, Symbol=%s, Qty=%d, Price=%.2f%n",
                        event.orderId(), event.symbol(), 
                        event.quantity(), event.price());
                }
            }
        }
    }
}

3. 실시간 Tailer (Blocking Read)

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;

import java.util.concurrent.TimeUnit;

public class ChronicleQueueRealtimeExample {
    
    private static final String QUEUE_PATH = "/tmp/realtime-queue";
    
    // 실시간 Producer 스레드
    static class Producer implements Runnable {
        @Override
        public void run() {
            try (ChronicleQueue queue = SingleChronicleQueueBuilder
                    .binary(QUEUE_PATH)
                    .build()) {
                
                ExcerptAppender appender = queue.acquireAppender();
                int counter = 0;
                
                while (!Thread.currentThread().isInterrupted()) {
                    String message = "Message-" + counter++;
                    appender.writeText(message);
                    System.out.println("[Producer] " + message);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    // 실시간 Consumer 스레드
    static class Consumer implements Runnable {
        @Override
        public void run() {
            try (ChronicleQueue queue = SingleChronicleQueueBuilder
                    .binary(QUEUE_PATH)
                    .build()) {
                
                ExcerptTailer tailer = queue.createTailer();
                
                while (!Thread.currentThread().isInterrupted()) {
                    String message = tailer.readText();
                    if (message != null) {
                        System.out.println("[Consumer] " + message);
                    } else {
                        // 메시지가 없으면 잠시 대기
                        TimeUnit.MILLISECONDS.sleep(10);
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        Thread producerThread = new Thread(new Producer());
        Thread consumerThread = new Thread(new Consumer());
        
        consumerThread.start();
        Thread.sleep(100); // Consumer가 먼저 시작되도록
        producerThread.start();
        
        // 10초 동안 실행
        Thread.sleep(10000);
        
        producerThread.interrupt();
        consumerThread.interrupt();
        
        producerThread.join();
        consumerThread.join();
    }
}

4. Named Tailer (여러 Consumer 지원)

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;

public class ChronicleQueueNamedTailerExample {
    
    private static final String QUEUE_PATH = "/tmp/multi-consumer-queue";
    
    public static void main(String[] args) throws InterruptedException {
        // Producer: 메시지 생성
        try (ChronicleQueue queue = SingleChronicleQueueBuilder
                .binary(QUEUE_PATH)
                .build()) {
            
            var appender = queue.acquireAppender();
            for (int i = 0; i < 10; i++) {
                appender.writeText("Event-" + i);
            }
        }
        
        // Consumer 1: 전체 메시지 처리
        Thread consumer1 = new Thread(() -> {
            try (ChronicleQueue queue = SingleChronicleQueueBuilder
                    .binary(QUEUE_PATH)
                    .build()) {
                
                ExcerptTailer tailer = queue.createTailer("consumer-1");
                
                String msg;
                while ((msg = tailer.readText()) != null) {
                    System.out.println("[Consumer-1] " + msg);
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // Consumer 2: 독립적으로 전체 메시지 처리
        Thread consumer2 = new Thread(() -> {
            try (ChronicleQueue queue = SingleChronicleQueueBuilder
                    .binary(QUEUE_PATH)
                    .build()) {
                
                ExcerptTailer tailer = queue.createTailer("consumer-2");
                
                String msg;
                while ((msg = tailer.readText()) != null) {
                    System.out.println("[Consumer-2] " + msg);
                    Thread.sleep(300);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        consumer1.start();
        consumer2.start();
        
        consumer1.join();
        consumer2.join();
    }
}

Chronicle Map vs Chronicle Queue 비교

특성Chronicle MapChronicle Queue
데이터 구조Key-Value 저장소메시지 큐 (순차 로그)
접근 패턴랜덤 액세스순차 읽기/쓰기
데이터 수명명시적 삭제 전까지 유지보관 정책에 따라 자동 삭제 가능
주요 용도캐시, 세션 저장소이벤트 소싱, 메시징
트랜잭션없음없음 (단순 append)
프로세스 간 공유가능가능
성능초고속 read/write초고속 sequential write

실전 성능 최적화 팁

Chronicle Map 최적화

// 1. 적절한 entries 크기 설정 (실제 사용량의 1.5배 권장)
ChronicleMap.of(String.class, String.class)
    .entries(1_500_000)  // 예상 100만 개면 150만으로 설정
    
// 2. actualChunkSize로 메모리 효율 개선
ChronicleMap.of(String.class, String.class)
    .entries(1_000_000)
    .actualChunkSize(128)  // 작은 청크 크기
    
// 3. putIfAbsent 활용 (동시성 제어)
map.putIfAbsent("key", "value");

// 4. 대량 데이터 처리 시 배치
for (Map.Entry<String, String> entry : batchData.entrySet()) {
    map.put(entry.getKey(), entry.getValue());
}

Chronicle Queue 최적화

// 1. Roll Cycle 설정 (파일 롤링 주기)
SingleChronicleQueueBuilder.binary(path)
    .rollCycle(RollCycles.HOURLY)  // 시간별 파일 생성
    
// 2. BlockSize 조정
SingleChronicleQueueBuilder.binary(path)
    .blockSize(256 << 20)  // 256MB 블록
    
// 3. 배치 쓰기
ExcerptAppender appender = queue.acquireAppender();
for (String msg : messages) {
    appender.writeText(msg);
}

// 4. Tailer 재사용
ExcerptTailer tailer = queue.createTailer("my-consumer");
// 프로그램 종료 시까지 재사용

주의사항

Chronicle Map

  1. entries 크기 부족: 지정한 크기를 초과하면 성능이 급격히 저하됩니다.
  2. Key/Value 크기: averageKeySize, averageValueSize를 정확히 설정해야 합니다.
  3. 파일 잠금: Windows에서 파일이 사용 중일 때 삭제 불가능합니다.
  4. GC 독립: Off-heap이므로 GC 모니터링에 나타나지 않습니다.

Chronicle Queue

  1. 디스크 공간: 메시지가 계속 쌓이므로 주기적인 정리가 필요합니다.
  2. 순차 읽기: 랜덤 액세스가 필요하면 부적합합니다.
  3. Named Tailer: 각 tailer는 독립적인 위치를 추적하므로 관리가 필요합니다.
  4. 파일 정리: 오래된 큐 파일을 수동으로 삭제해야 할 수 있습니다.

실제 사용 예제: 주문 처리 시스템

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.map.ChronicleMap;

import java.io.File;

public class OrderProcessingSystem {
    
    private final ChronicleQueue orderQueue;
    private final ChronicleMap<Long, String> orderStatusMap;
    
    public OrderProcessingSystem() throws Exception {
        // 주문 큐 초기화
        orderQueue = SingleChronicleQueueBuilder
            .binary("/data/orders")
            .build();
        
        // 주문 상태 맵 초기화
        orderStatusMap = ChronicleMap
            .of(Long.class, String.class)
            .entries(10_000_000)
            .averageKeySize(8)
            .averageValueSize(20)
            .createPersistedTo(new File("/data/order-status.dat"));
    }
    
    // 주문 접수
    public void submitOrder(long orderId, String orderDetails) {
        ExcerptAppender appender = orderQueue.acquireAppender();
        appender.writeText(orderId + ":" + orderDetails);
        
        // 상태 업데이트
        orderStatusMap.put(orderId, "PENDING");
        
        System.out.println("Order submitted: " + orderId);
    }
    
    // 주문 처리
    public void processOrders() {
        ExcerptTailer tailer = orderQueue.createTailer("order-processor");
        
        String order;
        while ((order = tailer.readText()) != null) {
            String[] parts = order.split(":");
            long orderId = Long.parseLong(parts[0]);
            String details = parts[1];
            
            // 주문 처리 로직
            processOrder(orderId, details);
            
            // 상태 업데이트
            orderStatusMap.put(orderId, "COMPLETED");
            
            System.out.println("Order processed: " + orderId);
        }
    }
    
    private void processOrder(long orderId, String details) {
        // 실제 주문 처리 로직
        try {
            Thread.sleep(100); // 처리 시뮬레이션
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 주문 상태 조회
    public String getOrderStatus(long orderId) {
        return orderStatusMap.get(orderId);
    }
    
    public void close() {
        orderQueue.close();
        orderStatusMap.close();
    }
}

결론

Chronicle Map 선택 시점

  • 빠른 캐시가 필요할 때
  • Key-Value 조회가 빈번할 때
  • 프로세스 간 데이터 공유가 필요할 때
  • GC 영향을 최소화해야 할 때

Chronicle Queue 선택 시점

  • 이벤트 소싱 패턴 구현 시
  • 초저지연 메시징이 필요할 때
  • 메시지 순서 보장이 중요할 때
  • 감사 로그 등 영구 저장이 필요할 때

0개의 댓글