| 용어 | 설명 |
|---|---|
| Producer | Kafka에 메시지를 보내는 클라이언트. 특정 Topic에 메시지를 기록한다. |
| Consumer | Kafka에서 메시지를 읽어오는 클라이언트. 한 개 이상의 Topic을 구독한다. |
| Topic | 메시지를 저장하는 논리적 이름 공간. 로그처럼 순서대로 쌓인다. |
| Partition | Topic을 나눈 물리적 단위. 파티션 수만큼 병렬 처리가 가능하다. |
| Consumer Group | 같은 Topic을 나눠서 읽는 Consumer들의 묶음. Offset은 그룹 단위로 관리된다. |
| Offset | 파티션 안 메시지의 위치 번호(0, 1, 2, …). "어디까지 읽었는지"의 기준이다. |
KRaft 모드(Zookeeper 없이) 3-브로커 클러스터 + Kafka UI
# docker-compose.yml
version: '3.8'
services:
kafka-1:
image: apache/kafka:3.7.0
container_name: kafka-1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- kafka1_data:/var/lib/kafka/data
networks:
- kafka-net
kafka-2:
image: apache/kafka:3.7.0
container_name: kafka-2
ports:
- "9093:9092"
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- kafka2_data:/var/lib/kafka/data
networks:
- kafka-net
kafka-3:
image: apache/kafka:3.7.0
container_name: kafka-3
ports:
- "9094:9092"
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- kafka3_data:/var/lib/kafka/data
networks:
- kafka-net
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8088:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local-kraft-cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
DYNAMIC_CONFIG_ENABLED: true
depends_on:
- kafka-1
- kafka-2
- kafka-3
networks:
- kafka-net
volumes:
kafka1_data:
kafka2_data:
kafka3_data:
networks:
kafka-net:
driver: bridge
docker-compose up -d
# Kafka UI → http://localhost:8088
build.gradle
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
}
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
Spring Kafka의 핵심 설정을 한눈에 정리하면 이렇습니다.
ProducerFactory → KafkaTemplate (메시지 전송)
ConsumerFactory → ListenerContainerFactory → @KafkaListener (메시지 수신)
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// ── Producer ──────────────────────────────────────────────────────────
@Bean
public ProducerFactory<String, String> stringProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> stringKafkaTemplate() {
return new KafkaTemplate<>(stringProducerFactory());
}
// ── Consumer ──────────────────────────────────────────────────────────
@Bean
public ConsumerFactory<String, String> stringConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
stringKafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(stringConsumerFactory());
return factory;
}
}
AUTO_OFFSET_RESET_CONFIG옵션
earliest: Consumer Group이 처음 시작할 때 토픽 맨 앞부터 읽는다.latest: 이 Consumer Group이 연결된 이후에 들어오는 메시지부터 읽는다.
@Service
@RequiredArgsConstructor
public class SimpleMessageProducer {
private static final String TOPIC = "simple-messages";
private final KafkaTemplate<String, String> stringKafkaTemplate;
public void send(String message) {
stringKafkaTemplate.send(TOPIC, message);
}
}
@Slf4j
@Component
public class SimpleMessageListener {
@KafkaListener(
topics = "simple-messages",
groupId = "simple-group",
containerFactory = "stringKafkaListenerContainerFactory"
)
public void consume(String message) {
log.info("받은 메시지: {}", message);
}
}
@KafkaListener의 세 가지 필수 속성
topics: 구독할 토픽 이름groupId: Consumer Group 이름 (Config의GROUP_ID_CONFIG와 일치시킨다)containerFactory: Config에서 만든ListenerContainerFactory빈 이름
문자열 대신 Java 객체를 Kafka로 주고받으려면 JSON 직렬화/역직렬화 설정이 필요합니다.
@Getter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class SimpleEvent {
private String message;
private String worker;
private LocalDateTime createdAt;
}
// Producer 설정
@Bean
public ProducerFactory<String, SimpleEvent> eventProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // JSON 직렬화
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, SimpleEvent> eventKafkaTemplate() {
return new KafkaTemplate<>(eventProducerFactory());
}
// Consumer 설정
@Bean
public ConsumerFactory<String, SimpleEvent> eventConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-event-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
JsonDeserializer<SimpleEvent> deserializer = new JsonDeserializer<>(SimpleEvent.class);
// 역직렬화를 허용할 패키지 지정 (보안 장치)
deserializer.addTrustedPackages("com.example.kafka.event");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SimpleEvent>
eventKafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, SimpleEvent>();
factory.setConsumerFactory(eventConsumerFactory());
return factory;
}
// Producer
@Service
@RequiredArgsConstructor
public class SimpleEventProducer {
private static final String TOPIC = "simple-events";
private final KafkaTemplate<String, SimpleEvent> eventKafkaTemplate;
public void send(SimpleEvent event) {
eventKafkaTemplate.send(TOPIC, event); // JsonSerializer가 자동으로 JSON 변환
}
}
// Listener
@Slf4j
@Component
public class SimpleEventListener {
@KafkaListener(
topics = "simple-events",
groupId = "simple-event-group",
containerFactory = "eventKafkaListenerContainerFactory"
)
public void consume(SimpleEvent event) { // JsonDeserializer가 자동으로 객체 변환
log.info("받은 이벤트: {}", event);
}
}
파티션 안 각 메시지에 붙는 순서 번호입니다. Consumer는 (Consumer Group, Partition) 조합으로 "어디까지 읽었는지"를 관리합니다.
group-a / partition-0 → offset 5 (5번까지 읽음)
group-b / partition-0 → offset 1 (1번까지 읽음)
→ Consumer Group이 다르면 같은 토픽이라도 Offset은 완전히 독립적입니다.
"이 Offset까지 읽었습니다"를 Kafka에 기록하는 행위입니다. Consumer가 재시작되면 마지막으로 Commit된 Offset의 다음 번호부터 읽습니다.
| 상황 | 문제 |
|---|---|
| Commit을 너무 빨리 | 처리 전에 "읽었다"고 기록 → 데이터 유실 |
| Commit을 너무 늦게 | 이미 처리한 메시지를 다시 읽음 → 중복 처리 |
Kafka의 기본 철학은 at-least-once (조금 중복되더라도, 최소 한 번은 꼭 전달)입니다.
Spring Kafka는 @KafkaListener 뒤에서 리스너 컨테이너가 자동으로 Offset을 Commit 해주기 때문에, 별도 코드 없이도 재시작 시 중복 수신이 발생하지 않습니다.
처리 성공을 확인한 뒤 직접 Commit하고 싶을 때 사용합니다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
manualAckKafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(manualConsumerFactory());
// AckMode를 MANUAL로 설정
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
@Slf4j
@Component
public class ManualAckConsumerListener {
@KafkaListener(
topics = "simple-messages",
groupId = "manual-ack-group",
containerFactory = "manualAckKafkaListenerContainerFactory"
)
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
String message = record.value();
log.info("[manual-ack] offset={}, message={}", record.offset(), message);
if (message.equals("error")) {
throw new RuntimeException("처리 실패 — Commit 하지 않음");
}
ack.acknowledge(); // ← 여기서 Commit
}
}
주의:
AckMode.MANUAL에서도 예외가 발생하면DefaultErrorHandler가 재시도를 거듭하다 포기(recover)하는 시점에 Offset을 Commit할 수 있습니다. "ack를 안 불렀는데 Commit이 된 것처럼 보이는 현상"은 에러 핸들러의 recover 동작 때문입니다.
@KafkaListener에서 예외가 발생했을 때의 흐름입니다.
[1] poll() 로 메시지 수신
[2] @KafkaListener 메서드 호출
[3] 비즈니스 로직에서 예외 발생
[4] DefaultErrorHandler 로 제어권 이동
[5] 재시도 여부 결정 (BackOff 설정에 따라)
[6] 최종 포기 → Offset Commit (+ DLT 전송 옵션)
기본 설정은 0ms 간격으로 최대 10번 재시도 후 포기합니다.
@Bean
public CommonErrorHandler kafkaErrorHandler() {
// 1초 간격, 최대 2번 추가 재시도 (총 3번 시도)
FixedBackOff backOff = new FixedBackOff(1000L, 2L);
return new DefaultErrorHandler(backOff);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
errorDemoKafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(errorDemoConsumerFactory());
factory.setCommonErrorHandler(kafkaErrorHandler()); // 에러 핸들러 등록
return factory;
}
DefaultErrorHandler errorHandler = new DefaultErrorHandler(backOff);
// 아래 예외는 재시도 없이 바로 포기
errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);
재시도해도 절대 성공하지 않는 메시지(잘못된 JSON, 존재하지 않는 ID 등)를 계속 재시도하면 해당 파티션의 뒷 메시지들이 모두 막힙니다. 이런 메시지를 별도 토픽(DLT)으로 격리해 두면, 본 처리 흐름을 막지 않고 나중에 분석·재처리할 수 있습니다.
원본 토픽:
error-demo→ DLT 토픽:error-demo-dlt(Spring Kafka 기본 네이밍)
@Bean
public CommonErrorHandler kafkaErrorHandlerWithDLT(
KafkaTemplate<String, String> stringKafkaTemplate) {
// 최종 실패 레코드를 DLT로 전송
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(stringKafkaTemplate);
FixedBackOff backOff = new FixedBackOff(1000L, 2L); // 1초 간격, 총 3번
return new DefaultErrorHandler(recoverer, backOff);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
errorDemoDLTKafkaListenerContainerFactory(CommonErrorHandler kafkaErrorHandlerWithDLT) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(errorDemoConsumerFactory());
factory.setCommonErrorHandler(kafkaErrorHandlerWithDLT);
return factory;
}
@KafkaListener(topics = "error-demo-dlt", groupId = "error-demo-dlt-group")
public void consumeDLT(String message) {
log.warn("[DLT] 처리 실패 메시지 수신 → 알림 발송 또는 DB 기록: {}", message);
}
| 질문 | 고려 사항 |
|---|---|
| 재시도 횟수는 몇 번? | 너무 많으면 Consumer 블로킹, 너무 적으면 일시 장애 회복 불가 |
| 어떤 예외는 바로 DLT로? | addNotRetryableExceptions()로 ValidationException 등 즉시 격리 |
| DLT 메시지 어떻게 처리? | 주기적 조회 → 원인 분석 → 수정 후 원본 토픽 재전송 or 별도 로그 |
{도메인}.{행위} 또는 {도메인}.{행위}.{환경}
order.created
order.canceled
user.signup.prod
하나의 토픽에는 성격이 같은 이벤트만 담는 것이 좋습니다. 에러 정책·재시도 횟수·DLT 전략이 도메인마다 다를 수 있기 때문입니다.
// key 없이 전송 → 라운드 로빈으로 파티션 분산 (순서 보장 X)
kafkaTemplate.send("order-events", "주문 이벤트");
// userId를 key로 → 같은 유저의 이벤트는 항상 같은 파티션 (순서 보장 O)
kafkaTemplate.send("order-events", userId, "유저 1의 주문 이벤트");
| 순서 보장 단위 | 추천 Key |
|---|---|
| 유저별 | userId |
| 주문별 | orderId |
| 계좌/포인트별 | accountId |
Hot Partition 주의: 특정 key에 메시지가 몰리면 그 파티션만 과부하가 됩니다. 순서 보장과 부하 분산 두 가지를 모두 고려해서 key를 설계해야 합니다.
Lag = 토픽에 쌓인 메시지 중 아직 읽지 못한 개수
Kafka UI(http://localhost:8088) → Consumer Groups 탭에서 파티션별 Lag을 실시간으로 확인할 수 있습니다.
