Kafka + Spring

이준연·2026년 3월 27일

1. Kafka 핵심 개념

용어설명
ProducerKafka에 메시지를 보내는 클라이언트. 특정 Topic에 메시지를 기록한다.
ConsumerKafka에서 메시지를 읽어오는 클라이언트. 한 개 이상의 Topic을 구독한다.
Topic메시지를 저장하는 논리적 이름 공간. 로그처럼 순서대로 쌓인다.
PartitionTopic을 나눈 물리적 단위. 파티션 수만큼 병렬 처리가 가능하다.
Consumer Group같은 Topic을 나눠서 읽는 Consumer들의 묶음. Offset은 그룹 단위로 관리된다.
Offset파티션 안 메시지의 위치 번호(0, 1, 2, …). "어디까지 읽었는지"의 기준이다.

2. Docker로 Kafka 클러스터 띄우기

KRaft 모드(Zookeeper 없이) 3-브로커 클러스터 + Kafka UI

# docker-compose.yml
version: '3.8'

services:
  kafka-1:
    image: apache/kafka:3.7.0
    container_name: kafka-1
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - kafka1_data:/var/lib/kafka/data
    networks:
      - kafka-net

  kafka-2:
    image: apache/kafka:3.7.0
    container_name: kafka-2
    ports:
      - "9093:9092"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29092,EXTERNAL://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - kafka2_data:/var/lib/kafka/data
    networks:
      - kafka-net

  kafka-3:
    image: apache/kafka:3.7.0
    container_name: kafka-3
    ports:
      - "9094:9092"
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - kafka3_data:/var/lib/kafka/data
    networks:
      - kafka-net

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8088:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local-kraft-cluster
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
      DYNAMIC_CONFIG_ENABLED: true
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    networks:
      - kafka-net

volumes:
  kafka1_data:
  kafka2_data:
  kafka3_data:

networks:
  kafka-net:
    driver: bridge
docker-compose up -d
# Kafka UI → http://localhost:8088

3. 의존성 추가 & 기본 설정

build.gradle

dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
}

application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094

4. KafkaConfig — Producer / Consumer 설정

Spring Kafka의 핵심 설정을 한눈에 정리하면 이렇습니다.

ProducerFactory  →  KafkaTemplate           (메시지 전송)
ConsumerFactory  →  ListenerContainerFactory  →  @KafkaListener  (메시지 수신)
@EnableKafka
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // ── Producer ──────────────────────────────────────────────────────────

    @Bean
    public ProducerFactory<String, String> stringProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,   StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> stringKafkaTemplate() {
        return new KafkaTemplate<>(stringProducerFactory());
    }

    // ── Consumer ──────────────────────────────────────────────────────────

    @Bean
    public ConsumerFactory<String, String> stringConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,        bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,                  "simple-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,        "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    stringKafkaListenerContainerFactory() {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(stringConsumerFactory());
        return factory;
    }
}

AUTO_OFFSET_RESET_CONFIG 옵션

  • earliest : Consumer Group이 처음 시작할 때 토픽 맨 앞부터 읽는다.
  • latest : 이 Consumer Group이 연결된 이후에 들어오는 메시지부터 읽는다.

5. String 메시지 Producer & Listener

Producer

@Service
@RequiredArgsConstructor
public class SimpleMessageProducer {

    private static final String TOPIC = "simple-messages";
    private final KafkaTemplate<String, String> stringKafkaTemplate;

    public void send(String message) {
        stringKafkaTemplate.send(TOPIC, message);
    }
}

Listener

@Slf4j
@Component
public class SimpleMessageListener {

    @KafkaListener(
        topics          = "simple-messages",
        groupId         = "simple-group",
        containerFactory = "stringKafkaListenerContainerFactory"
    )
    public void consume(String message) {
        log.info("받은 메시지: {}", message);
    }
}

@KafkaListener의 세 가지 필수 속성

  • topics : 구독할 토픽 이름
  • groupId : Consumer Group 이름 (Config의 GROUP_ID_CONFIG와 일치시킨다)
  • containerFactory : Config에서 만든 ListenerContainerFactory 빈 이름

6. 객체(DTO)를 Kafka로 주고받기

문자열 대신 Java 객체를 Kafka로 주고받으려면 JSON 직렬화/역직렬화 설정이 필요합니다.

이벤트 클래스

@Getter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class SimpleEvent {
    private String        message;
    private String        worker;
    private LocalDateTime createdAt;
}

Config 추가 — JsonSerializer / JsonDeserializer

// Producer 설정
@Bean
public ProducerFactory<String, SimpleEvent> eventProducerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,        bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,     StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,   JsonSerializer.class); // JSON 직렬화
    return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public KafkaTemplate<String, SimpleEvent> eventKafkaTemplate() {
    return new KafkaTemplate<>(eventProducerFactory());
}

// Consumer 설정
@Bean
public ConsumerFactory<String, SimpleEvent> eventConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,        bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,                  "simple-event-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,        "earliest");

    JsonDeserializer<SimpleEvent> deserializer = new JsonDeserializer<>(SimpleEvent.class);
    // 역직렬화를 허용할 패키지 지정 (보안 장치)
    deserializer.addTrustedPackages("com.example.kafka.event");

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, SimpleEvent>
eventKafkaListenerContainerFactory() {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, SimpleEvent>();
    factory.setConsumerFactory(eventConsumerFactory());
    return factory;
}

Producer & Listener

// Producer
@Service
@RequiredArgsConstructor
public class SimpleEventProducer {

    private static final String TOPIC = "simple-events";
    private final KafkaTemplate<String, SimpleEvent> eventKafkaTemplate;

    public void send(SimpleEvent event) {
        eventKafkaTemplate.send(TOPIC, event); // JsonSerializer가 자동으로 JSON 변환
    }
}

// Listener
@Slf4j
@Component
public class SimpleEventListener {

    @KafkaListener(
        topics           = "simple-events",
        groupId          = "simple-event-group",
        containerFactory = "eventKafkaListenerContainerFactory"
    )
    public void consume(SimpleEvent event) { // JsonDeserializer가 자동으로 객체 변환
        log.info("받은 이벤트: {}", event);
    }
}

7. Offset과 Commit — 데이터 재처리 제어

Offset이란?

파티션 안 각 메시지에 붙는 순서 번호입니다. Consumer는 (Consumer Group, Partition) 조합으로 "어디까지 읽었는지"를 관리합니다.

group-a / partition-0 → offset 5  (5번까지 읽음)
group-b / partition-0 → offset 1  (1번까지 읽음)

Consumer Group이 다르면 같은 토픽이라도 Offset은 완전히 독립적입니다.

Commit이란?

"이 Offset까지 읽었습니다"를 Kafka에 기록하는 행위입니다. Consumer가 재시작되면 마지막으로 Commit된 Offset의 다음 번호부터 읽습니다.

상황문제
Commit을 너무 빨리처리 전에 "읽었다"고 기록 → 데이터 유실
Commit을 너무 늦게이미 처리한 메시지를 다시 읽음 → 중복 처리

Kafka의 기본 철학은 at-least-once (조금 중복되더라도, 최소 한 번은 꼭 전달)입니다.

Spring Kafka는 @KafkaListener 뒤에서 리스너 컨테이너가 자동으로 Offset을 Commit 해주기 때문에, 별도 코드 없이도 재시작 시 중복 수신이 발생하지 않습니다.


8. 수동 Commit (Acknowledgment)

처리 성공을 확인한 뒤 직접 Commit하고 싶을 때 사용합니다.

Config 추가

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
manualAckKafkaListenerContainerFactory() {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(manualConsumerFactory());
    // AckMode를 MANUAL로 설정
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    return factory;
}

Listener

@Slf4j
@Component
public class ManualAckConsumerListener {

    @KafkaListener(
        topics           = "simple-messages",
        groupId          = "manual-ack-group",
        containerFactory = "manualAckKafkaListenerContainerFactory"
    )
    public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String message = record.value();
        log.info("[manual-ack] offset={}, message={}", record.offset(), message);

        if (message.equals("error")) {
            throw new RuntimeException("처리 실패 — Commit 하지 않음");
        }

        ack.acknowledge(); // ← 여기서 Commit
    }
}

주의: AckMode.MANUAL에서도 예외가 발생하면 DefaultErrorHandler가 재시도를 거듭하다 포기(recover)하는 시점에 Offset을 Commit할 수 있습니다. "ack를 안 불렀는데 Commit이 된 것처럼 보이는 현상"은 에러 핸들러의 recover 동작 때문입니다.


9. 에러 핸들링 — DefaultErrorHandler

@KafkaListener에서 예외가 발생했을 때의 흐름입니다.

[1] poll() 로 메시지 수신
[2] @KafkaListener 메서드 호출
[3] 비즈니스 로직에서 예외 발생
[4] DefaultErrorHandler 로 제어권 이동
[5] 재시도 여부 결정 (BackOff 설정에 따라)
[6] 최종 포기 → Offset Commit (+ DLT 전송 옵션)

기본 설정은 0ms 간격으로 최대 10번 재시도 후 포기합니다.

재시도 횟수 & 간격 직접 설정

@Bean
public CommonErrorHandler kafkaErrorHandler() {
    // 1초 간격, 최대 2번 추가 재시도 (총 3번 시도)
    FixedBackOff backOff = new FixedBackOff(1000L, 2L);
    return new DefaultErrorHandler(backOff);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
errorDemoKafkaListenerContainerFactory() {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(errorDemoConsumerFactory());
    factory.setCommonErrorHandler(kafkaErrorHandler()); // 에러 핸들러 등록
    return factory;
}

특정 예외는 재시도하지 않도록 설정

DefaultErrorHandler errorHandler = new DefaultErrorHandler(backOff);
// 아래 예외는 재시도 없이 바로 포기
errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);

10. Dead Letter Topic (DLT)

DLT가 필요한 이유

재시도해도 절대 성공하지 않는 메시지(잘못된 JSON, 존재하지 않는 ID 등)를 계속 재시도하면 해당 파티션의 뒷 메시지들이 모두 막힙니다. 이런 메시지를 별도 토픽(DLT)으로 격리해 두면, 본 처리 흐름을 막지 않고 나중에 분석·재처리할 수 있습니다.

원본 토픽: error-demo → DLT 토픽: error-demo-dlt (Spring Kafka 기본 네이밍)

DLT Config 설정

@Bean
public CommonErrorHandler kafkaErrorHandlerWithDLT(
        KafkaTemplate<String, String> stringKafkaTemplate) {

    // 최종 실패 레코드를 DLT로 전송
    DeadLetterPublishingRecoverer recoverer =
        new DeadLetterPublishingRecoverer(stringKafkaTemplate);

    FixedBackOff backOff = new FixedBackOff(1000L, 2L); // 1초 간격, 총 3번

    return new DefaultErrorHandler(recoverer, backOff);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
errorDemoDLTKafkaListenerContainerFactory(CommonErrorHandler kafkaErrorHandlerWithDLT) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(errorDemoConsumerFactory());
    factory.setCommonErrorHandler(kafkaErrorHandlerWithDLT);
    return factory;
}

DLT Listener (선택)

@KafkaListener(topics = "error-demo-dlt", groupId = "error-demo-dlt-group")
public void consumeDLT(String message) {
    log.warn("[DLT] 처리 실패 메시지 수신 → 알림 발송 또는 DB 기록: {}", message);
}

DLT 운영 전략 3가지 고민

질문고려 사항
재시도 횟수는 몇 번?너무 많으면 Consumer 블로킹, 너무 적으면 일시 장애 회복 불가
어떤 예외는 바로 DLT로?addNotRetryableExceptions()로 ValidationException 등 즉시 격리
DLT 메시지 어떻게 처리?주기적 조회 → 원인 분석 → 수정 후 원본 토픽 재전송 or 별도 로그

11. 운영 설계 팁 — Topic / Key / Partition / Lag

Topic 네이밍

{도메인}.{행위}  또는  {도메인}.{행위}.{환경}

order.created
order.canceled
user.signup.prod

하나의 토픽에는 성격이 같은 이벤트만 담는 것이 좋습니다. 에러 정책·재시도 횟수·DLT 전략이 도메인마다 다를 수 있기 때문입니다.

Key 설계 — 순서 보장 vs 부하 분산

// key 없이 전송 → 라운드 로빈으로 파티션 분산 (순서 보장 X)
kafkaTemplate.send("order-events", "주문 이벤트");

// userId를 key로 → 같은 유저의 이벤트는 항상 같은 파티션 (순서 보장 O)
kafkaTemplate.send("order-events", userId, "유저 1의 주문 이벤트");
순서 보장 단위추천 Key
유저별userId
주문별orderId
계좌/포인트별accountId

Hot Partition 주의: 특정 key에 메시지가 몰리면 그 파티션만 과부하가 됩니다. 순서 보장과 부하 분산 두 가지를 모두 고려해서 key를 설계해야 합니다.

Partition 수

  • 하나의 Consumer Group에서 동시에 일할 수 있는 Consumer 수 = 파티션 수
  • 초기에는 3, 6, 12처럼 확장하기 좋은 배수로 시작
  • 파티션 수를 늘리면 key 기준 순서 보장이 깨질 수 있으므로 주의

Lag 모니터링

Lag = 토픽에 쌓인 메시지 중 아직 읽지 못한 개수

  • Lag ≈ 0 → 실시간 처리 중
  • Lag 지속 증가 → Consumer 증설 또는 처리 로직 최적화 필요

Kafka UI(http://localhost:8088) → Consumer Groups 탭에서 파티션별 Lag을 실시간으로 확인할 수 있습니다.


전체 흐름 한눈에 보기


profile
반갑습니다!

0개의 댓글