의존성 추가
implementation 'org.springframework.kafka:spring-kafka'
Kafka + Zookeeper Docker Compose
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Producer
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.easytable.reservation.dto.request.ReservationCreateReqDto;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Repository;
import java.util.concurrent.TimeoutException;
@Repository("kafka")
@RequiredArgsConstructor
@Slf4j
public class KafkaMessagePublisherImpl implements MessagePublisher {
private static final int MAX_ATTEMPTS = 5;
private static final long RETRY_DELAY_MS = 500;
private final KafkaTemplate<String, ReservationCreateReqDto> kafkaTemplate;
@Value("${kafka.topic.reservation:create-reservation}")
private String topic;
@Override
public void publish(ReservationCreateReqDto dto) throws TimeoutException {
int attempt = 0;
while (attempt < MAX_ATTEMPTS) {
try {
kafkaTemplate.send(topic, dto.getRequestId(), dto).get();
return;
} catch (Exception e) {
attempt++;
sleepThread();
}
}
throw new TimeoutException("Kafka 메시지 전송 실패");
}
private void sleepThread() {
try {
Thread.sleep(RETRY_DELAY_MS);
} catch (InterruptedException e) {
System.out.println("thread interrupted");
}
}
}
Consumer
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaReservationListener {
private final SinksRegistry sinkRegistry;
// 실제 예약 생성 로직을 담당하는 서비스 (DB 저장 등)
private final ReservationService reservationProcessingService;
@KafkaListener(topics = "${kafka.topic.reservation:create-reservation}", groupId = "reservation-group")
public void listen(ReservationCreateReqDto request) {
try {
ReservationCreateResDto response = reservationProcessingService.createReservation(request);
// SinkRegistry를 통해 요청을 보낸 스레드에 결과 전송
sinkRegistry.completeSink(request.getRequestId(), response);
} catch (Exception e) {
log.error(e.getMessage());
sinkRegistry.completeSinkExceptionally(request.getRequestId(), e);
}
}
}
Kafka에서는 add 시 maxlen을 신경쓰지 않아도 되는가?
Kafka에서는 add 시 maxlen을 지정하지 않는다.
전체 로그 크기, 보존 기간 등은 Topic 단위의 보존 정책에 따라 관리된다.
결론적으로 Producer는 maxlen에 신경 쓸 필요 없다.
그렇다면 Topic을 어떻게 관리할 수 있는가?
KafkaAdmin 빈을 통해 애플리케이션 시작 시 Topic을 자동으로 생성할 수 있다.
@Bean
public NewTopic topic() {
Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "604800000"); // 예: 7일 보존
return TopicBuilder.name("create-reservation")
.partitions(3)
.replicas(1)
.configs(configs)
.build();
}
다만 이미 존재하는 토픽의 설정을 변경하는 것은 Spring이 아닌 Kafka CLI와 같은 도구에서 관리하는 것이 일반적이다.
Spring에서 Kafka 사용 시 Message에 ACK를 보내 관리하지 않아도 되는가?
Spring Kafka는 기본적으로 컨테이너가 ACK 전송 등 오프셋을 자동으로 관리하는 auto-commit 기능을 제공한다.
필요 시에는 수동으로도 commit 가능하다.
설정 코드를 통해 consumer 클래스들을 각 group에 subscribe되도록 명시하지 않아도 되는가?
@KafkaListener, groupId 설정을 통해 consumer들이 자동으로 해당 group에 등록된다.
또한 같은 groupId를 갖는 소비자들을 하나의 그룹으로 자동 인식하기에 별도의 group 생성 명령이 필요하지 않다.
여러 Kafka consumer 스레드들을 어떻게 동시에 작동시킬 수 있는가?
ConcurrentKafkaListenerContainerFactory.setConcurrency() 메서드를 사용해 동시에 실행될 consumer thread 수를 조절할 수 있다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ReservationCreateReqDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ReservationCreateReqDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 예: 3개의 스레드로 병렬 처리
return factory;
}