@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, LogMessageDto> logConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "log-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(LogMessageDto.class, false));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, LogMessageDto> logKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, LogMessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(logConsumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> stringConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "string-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> stringKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(stringConsumerFactory());
return factory;
}
}
Kafka Producer/Consumer 설정을 Bean으로 등록하는 설정 클래스이다.
Config의 설명은 블로그에서 확인해주길 바란다.
@Slf4j
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "json-streams-app-001");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);
return new KafkaStreamsConfiguration(props);
}
@Bean
public KStream<String, LogMessageDto> jsonStream(StreamsBuilder builder) {
JsonSerde<LogMessageDto> logEventSerde = new JsonSerde<>(LogMessageDto.class);
KStream<String, LogMessageDto> stream = builder
.stream("log-topic", Consumed.with(Serdes.String(), logEventSerde));
stream
// "ERROR" 레벨만 필터링해서 error-log 토픽으로 전송
.filter((key, value) -> value != null && "ERROR".equals(value.getLevel()))
.peek((key, value) -> log.info("🚨 오류 로그 감지: {}", value))
.to("error-log", Produced.with(Serdes.String(), new JsonSerde<>(LogMessageDto.class)));
stream
// key를 "error"로 고정해서 그룹화하고 5분 단위 윈도우(TimeWindows)를 설정하여
// "ERROR" 로그 발생 수를 집계
.map((key, value) -> new KeyValue<>("error", value))
.groupByKey(Grouped.with(Serdes.String(), logEventSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count()
// 윈도우 시작/끝 시간 + count 값을 로그로 출력하고
// count 값을 문자열로 변환해서 "error-count-topic"으로 전송
.toStream()
.peek((windowedKey, count) -> {
String windowStart = windowedKey.window().startTime().toString();
String windowEnd = windowedKey.window().endTime().toString();
log.info("✅ {} ~ {} 동안 {}건의 ERROR 발생", windowStart, windowEnd, count);
})
.map((windowedKey, count) -> new KeyValue<>(windowedKey.key(), String.valueOf(count)))
.to("error-count-topic", Produced.with(Serdes.String(), Serdes.String()));
return stream;
}
}
@EnableKafkaStreams
: Spring Kafka에서 Kafka Streams 애플리케이션을 자동으로 구성해주는 어노테이션이다.
DEFAULT_KEY_SERDE / VALUE_SERDE
: 기본 직렬화/역직렬화 방식 지정. JSON 처리 시에는 따로 Serde
지정하기 때문에 String
으로 둬도 된다.
AT_LEAST_ONCE
: 최소 1번 이상 처리 보장한다. (기본값)
@RestController
@RequiredArgsConstructor
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducerService kafkaProducerService;
@PostMapping("/log")
public ResponseEntity<String> sendLog(@RequestBody LogMessageDto message) {
kafkaProducerService.sendLog("log-topic", message);
return ResponseEntity.ok("카프카로 로그 메세지 전송 성공");
}
}
REST API를 통해 Kafka 메시지를 전송할 수 있도록 하는 Controller이다.
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@ToString
public class LogMessageDto {
private String level;
private String message;
private LocalDateTime timestamp;
@JsonCreator
public LogMessageDto(@JsonProperty("level") String level,
@JsonProperty("message") String message,
@JsonProperty("timestamp") LocalDateTime timestamp) {
this.level = level;
this.message = message;
this.timestamp = timestamp != null ? timestamp : LocalDateTime.now();
}
}
메시지 객체이다.
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void sendLog(String topic, LogMessageDto logMessageDto) {
kafkaTemplate.send(topic, logMessageDto);
log.info("[Producer] 로그 객체 전송 : " + logMessageDto);
}
}
Kafka 메시지를 실제로 발행하는 역할이다.
@Slf4j
@Component
public class KafkaLogConsumerService {
@KafkaListener(topics = "error-log", groupId = "log-group", containerFactory = "logKafkaListenerContainerFactory")
public void consume(LogMessageDto logMessageDto) {
log.info("[Consumer] error-log 수신: {}", logMessageDto);
}
}
Kafka에서 "error-log" 토픽을 수신하는 Consumer이다.
Kafka Streams에서 필터링된 메시지 error-log가 여기 들어온다.
@Slf4j
@Component
public class KafkaErrorCountConsumerService {
@KafkaListener(topics = "error-count-topic", groupId = "log-group", containerFactory = "stringKafkaListenerContainerFactory")
public void consumeErrorCount(String count) {
log.info("🟢 error-count-topic 수신: {}건", count);
}
}
Kafka에서 "error-count-topic" 토픽을 수신하는 Consumer이다.
Kafka Streams에서 필터링된 메시지 error-count-topic이 여기 들어온다.
INFO
는 객체 전송은 되지만 ERROR
만 필터링 되서 error-log
로 보내기 때문에 Consumer에서 찍히진 않는다.
ERROR
는 필터링 잘되서 error-log
로 보내져 Consumer로 잘 전송되었다.
5분 단위로 집계된 ERROR
로그 수가 error-count-topic
을 통해 잘 수신한다.
오늘은 Kafka Streams를 활용한 실시간 로그 필터링 및 집계를 구현해봤다. KStream<String, Dto>
에서 .filter()
-> .groupByKey()
-> .windowedBy()
-> .count()
-> .toStream()
흐름을 통해 실시간으로 집계 파이프 라인을 만들 수 있었다. 이런 스트림 처리 기능 덕분에 기존의 복잡한 컨슈머 로직 없이도 필터링, 변환, 집계를 체계적으로 처리할 수 있는 게 유용하다고 느꼈다.
추가로 주의할 점은 Kafka Producer
또는 StreamsConfig
에서 사용하는 직렬화 클래스(Serde)와 Consumer가 받는 클래스가 일치하지 않으면 역직렬화 에러가 엄청 빠른 속도로 연속으로 발생하니 클래스 일치 여부를 꼭 확인해야 한다. 특히 DTO 구조 변경 시에는 Consumer
, Streams
, Producer
전체 흐름에서 타입 호환을 신중히 체크하며 구현해야한다.