
마주했던 가장 큰 도전 과제였던 대규모 이벤트 처리 시스템 구축 경험을 공유하고자 합니다.
특히 일일 1천만 건의 공공기관 콜센터 상담 이벤트를 안정적으로 처리하면서도 비용을 효율적으로 관리하는 방법에 대해 실제 사례를 중심으로 설명하겠습니다.
flowchart LR
Client([클라이언트]) --> LoadBalancer[로드밸런서]
LoadBalancer --> RequestRecorder[요청기록기]
RequestRecorder --> PubSub[(Pub/Sub)]
PubSub --> LogProcessor[로그처리기]
LogProcessor --> Router[라우터]
Router --> AnalyticsDB[(분석 DB)]
Router --> Storage[(GCS 저장소)]
style Client fill:#f9f,stroke:#333,stroke-width:4px
style PubSub fill:#bbf,stroke:#333,stroke-width:2px
style AnalyticsDB fill:#bbf,stroke:#333,stroke-width:2px
style Storage fill:#bbf,stroke:#333,stroke-width:2px
@Service
public class EventProcessorService {
private final KafkaTemplate<String, Event> kafkaTemplate;
private final ObjectMapper objectMapper;
private static final String TOPIC = "event-stream";
@Async
public CompletableFuture<Boolean> processEvent(Event event) {
try {
// 이벤트 유효성 검증
validateEvent(event);
// 중복 제거를 위한 이벤트 ID 생성
String eventId = generateEventId(event);
event.setEventId(eventId);
// 이벤트 압축
byte[] compressedEvent = compressEvent(event);
// Kafka로 전송
ProducerRecord<String, Event> record =
new ProducerRecord<>(TOPIC, eventId, event);
return CompletableFuture.completedFuture(true);
} catch (Exception e) {
log.error("Event processing failed", e);
return CompletableFuture.completedFuture(false);
}
}
private byte[] compressEvent(Event event) throws IOException {
byte[] serializedEvent = objectMapper.writeValueAsBytes(event);
return ZstdCompressor.compress(serializedEvent);
}
}
@Component
public class ZstdCompressor {
private static final int COMPRESSION_LEVEL = 3;
public static byte[] compress(byte[] data) {
Encoder encoder = new Encoder();
encoder.setParameter(Encoder.Parameter.compressionLevel, COMPRESSION_LEVEL);
return encoder.encode(data);
}
@Scheduled(fixedRate = 3600000) // 1시간마다 실행
public void compressBatchData() {
List<Event> events = eventRepository.findUncompressedEvents();
Map<String, Event> uniqueEvents = new HashMap<>();
// 중복 제거
for (Event event : events) {
String key = generateEventKey(event);
uniqueEvents.putIfAbsent(key, event);
}
// 압축 및 저장
for (Event event : uniqueEvents.values()) {
byte[] compressed = compress(serialize(event));
storageService.store(compressed);
}
}
}
@Service
public class StorageOptimizationService {
private final GCSStorage gcsStorage;
public void optimizeStorage() {
// 콜드 스토리지로 이동할 데이터 식별
List<StorageObject> oldObjects =
gcsStorage.listObjects(
Storage.BlobListOption.prefix("events/"),
Storage.BlobListOption.currentDirectory());
for (StorageObject object : oldObjects) {
if (isEligibleForColdStorage(object)) {
moveToArchiveStorage(object);
}
}
}
private boolean isEligibleForColdStorage(StorageObject object) {
// 3개월 이상 된 데이터 확인
return object.getCreateTime().plusMonths(3)
.isBefore(Instant.now());
}
}



성능 향상: 동기식 처리 대비 처리량이 10배 이상 증가
확장성 개선: 트래픽 증가에 따른 유연한 스케일링 가능
리소스 효율성: CPU와 메모리 사용률 최적화
압축 기술 도입 효과
중복 제거 시스템
@Service
public class AnomalyDetectionService {
private final MLModel mlModel;
public void detectAnomalies(Stream<Event> events) {
// 실시간 데이터 스트림 분석
events.parallel()
.map(this::extractFeatures)
.filter(features -> mlModel.predict(features) > threshold)
.forEach(this::triggerAlert);
}
}
스트리밍 분석 파이프라인
고급 분석 기능
global:
regions:
- name: asia-northeast
priority: 1
resources:
cpu: 1000m
memory: 2Gi
- name: us-west
priority: 2
resources:
cpu: 800m
memory: 1.5Gi