Spring Boot + Kafka Streams 연동해 구조 공부

송현진·2025년 4월 18일
0

Kafka

목록 보기
4/7

KafkaConfig

@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, LogMessageDto> logConsumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "log-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(LogMessageDto.class, false));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, LogMessageDto> logKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, LogMessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(logConsumerFactory());
        return factory;
    }
    
    @Bean
    public ConsumerFactory<String, String> stringConsumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "string-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> stringKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(stringConsumerFactory());
        return factory;
    }
    
}    

Kafka Producer/Consumer 설정을 Bean으로 등록하는 설정 클래스이다.
Config의 설명은 블로그에서 확인해주길 바란다.


KafkaStreamsConfig

@Slf4j
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "json-streams-app-001");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);

        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public KStream<String, LogMessageDto> jsonStream(StreamsBuilder builder) {
        JsonSerde<LogMessageDto> logEventSerde = new JsonSerde<>(LogMessageDto.class);

        KStream<String, LogMessageDto> stream = builder
                .stream("log-topic", Consumed.with(Serdes.String(), logEventSerde));

        stream
        		// "ERROR" 레벨만 필터링해서 error-log 토픽으로 전송
                .filter((key, value) -> value != null && "ERROR".equals(value.getLevel()))
                .peek((key, value) -> log.info("🚨 오류 로그 감지: {}", value))
                .to("error-log", Produced.with(Serdes.String(), new JsonSerde<>(LogMessageDto.class)));

        stream
        		//  key를 "error"로 고정해서 그룹화하고 5분 단위 윈도우(TimeWindows)를 설정하여 
                // "ERROR" 로그 발생 수를 집계
                .map((key, value) -> new KeyValue<>("error", value))
                .groupByKey(Grouped.with(Serdes.String(), logEventSerde))
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
                .count()
                // 윈도우 시작/끝 시간 + count 값을 로그로 출력하고
                // count 값을 문자열로 변환해서 "error-count-topic"으로 전송
                .toStream()
                .peek((windowedKey, count) -> {
                    String windowStart = windowedKey.window().startTime().toString();
                    String windowEnd = windowedKey.window().endTime().toString();
                    log.info("✅ {} ~ {} 동안 {}건의 ERROR 발생", windowStart, windowEnd, count);
                })
                .map((windowedKey, count) -> new KeyValue<>(windowedKey.key(), String.valueOf(count)))
                .to("error-count-topic", Produced.with(Serdes.String(), Serdes.String()));

        return stream;
    }
}    

@EnableKafkaStreams : Spring Kafka에서 Kafka Streams 애플리케이션을 자동으로 구성해주는 어노테이션이다.
DEFAULT_KEY_SERDE / VALUE_SERDE : 기본 직렬화/역직렬화 방식 지정. JSON 처리 시에는 따로 Serde 지정하기 때문에 String으로 둬도 된다.
AT_LEAST_ONCE: 최소 1번 이상 처리 보장한다. (기본값)


controller

@RestController
@RequiredArgsConstructor
@RequestMapping("/kafka")
public class KafkaController {

    private final KafkaProducerService kafkaProducerService;

    @PostMapping("/log")
    public ResponseEntity<String> sendLog(@RequestBody LogMessageDto message) {
        kafkaProducerService.sendLog("log-topic", message);
        return ResponseEntity.ok("카프카로 로그 메세지 전송 성공");
    }
}    

REST API를 통해 Kafka 메시지를 전송할 수 있도록 하는 Controller이다.


dto

@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@ToString
public class LogMessageDto {

    private String level;
    private String message;
    private LocalDateTime timestamp;

    @JsonCreator
    public LogMessageDto(@JsonProperty("level") String level,
                         @JsonProperty("message") String message,
                         @JsonProperty("timestamp") LocalDateTime timestamp) {
        this.level = level;
        this.message = message;
        this.timestamp = timestamp != null ? timestamp : LocalDateTime.now();
    }
}

메시지 객체이다.


📤 producerService

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducerService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void sendLog(String topic, LogMessageDto logMessageDto) {
        kafkaTemplate.send(topic, logMessageDto);
        log.info("[Producer] 로그 객체 전송 : " + logMessageDto);
    }
}    

Kafka 메시지를 실제로 발행하는 역할이다.


📥 consumerService

@Slf4j
@Component
public class KafkaLogConsumerService {

    @KafkaListener(topics = "error-log", groupId = "log-group", containerFactory = "logKafkaListenerContainerFactory")
    public void consume(LogMessageDto logMessageDto) {
        log.info("[Consumer] error-log 수신: {}", logMessageDto);
    }
}

Kafka에서 "error-log" 토픽을 수신하는 Consumer이다.
Kafka Streams에서 필터링된 메시지 error-log가 여기 들어온다.


📥 countConsumerService

@Slf4j
@Component
public class KafkaErrorCountConsumerService {

    @KafkaListener(topics = "error-count-topic", groupId = "log-group", containerFactory = "stringKafkaListenerContainerFactory")
    public void consumeErrorCount(String count) {
        log.info("🟢 error-count-topic 수신: {}건", count);
    }
}

Kafka에서 "error-count-topic" 토픽을 수신하는 Consumer이다.
Kafka Streams에서 필터링된 메시지 error-count-topic이 여기 들어온다.


출력 결과

INFO는 객체 전송은 되지만 ERROR만 필터링 되서 error-log로 보내기 때문에 Consumer에서 찍히진 않는다.



ERROR는 필터링 잘되서 error-log로 보내져 Consumer로 잘 전송되었다.

5분 단위로 집계된 ERROR 로그 수가 error-count-topic을 통해 잘 수신한다.


📝 배운점

오늘은 Kafka Streams를 활용한 실시간 로그 필터링 및 집계를 구현해봤다. KStream<String, Dto>에서 .filter() -> .groupByKey() -> .windowedBy() -> .count() -> .toStream() 흐름을 통해 실시간으로 집계 파이프 라인을 만들 수 있었다. 이런 스트림 처리 기능 덕분에 기존의 복잡한 컨슈머 로직 없이도 필터링, 변환, 집계를 체계적으로 처리할 수 있는 게 유용하다고 느꼈다.

추가로 주의할 점은 Kafka Producer 또는 StreamsConfig에서 사용하는 직렬화 클래스(Serde)와 Consumer가 받는 클래스가 일치하지 않으면 역직렬화 에러가 엄청 빠른 속도로 연속으로 발생하니 클래스 일치 여부를 꼭 확인해야 한다. 특히 DTO 구조 변경 시에는 Consumer, Streams, Producer 전체 흐름에서 타입 호환을 신중히 체크하며 구현해야한다.

profile
개발자가 되고 싶은 취준생

0개의 댓글