Spring Kafka를 시작하게 되었다
이 글에서는 docker-compose로 kafka 실행시키고 spring과 연결되는 지까지만 확인할 예정이다.
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
container_name: kafka
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
apache 공식으로도 kafka이미지가 있긴 한데 일단 confluentinc/cp-kafka를 사용하기로 했다
kafka 커넥터라던가... 스키마 레지스트리라던가... 추가할 게 많다
docker exec -it kafka /bin/bash
저기 있는 kafka는 container 이름이다. 저 명령어를 사용해서 kafka 컨테이너에 접근하도록 하자
당장 알아야하는 kafka 콘솔은 4가지이다.
토픽 생성 /bin/kafka-topics --create --topic test --bootstrap-server localhost:9092 --partitions 1토픽 목록 /bin/kafka-topics --bootstrap-server localhost:9092 --list토픽에 메시지 추가 /bin/kafka-console-producer --bootstrap-server localhost:9092 --topic test토픽의 메시지 받기 (--from-beginning = 메시지를 처음부터 받기) /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
apache의 공식 kafka이미지를 사용하는 경우 /opt/kafka/bin에 있고
confluentinc/cp-kafka 이미지를 사용하는 경우 /bin아래 다 있다.
spring:
kafka:
bootstrap-servers:
- localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
이런 에러가 발생했다면 serializer가 제대로 되었는지 확인해주세요.
Caused by: java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
이 에러가 발생했다면 consumer.group-id를 추가해주세요.
@Slf4j
@RequiredArgsConstructor
@Component
public class TestKafkaProducer {
private final KafkaTemplate<String , String > kafkaTemplate;
public void sendMessage(String topic, String message){
kafkaTemplate.send(topic, message);
log.info("topic : {}, message : {}", topic, message);
}
}
@ActiveProfiles("test")
@SpringBootTest
class TestKafkaProducerTest {
@Autowired private TestKafkaProducer testKafkaProducer;
@Test
public void sendMessage(){
testKafkaProducer.sendMessage("test", "delete all");
}
}
kafka 컨테이너에 exec로 접근해서 test topic에 delete all 메시지가 있는지 확인해봅시다.
@Slf4j
@Component
public class TestKafkaConsumer {
@KafkaListener(topics="test")
public void consume(String message){
log.info("Received message : {}", message);
}
}
kafka-console-producer로 메시지를 남긴 뒤 제대로 spring 서버가 제대로 읽어오는 지 확인해봅시다