git clone https://github.com/wurstmeister/kafka-docker
위 명령어로 kafka-docker 폴더를 clone 하면
이렇게 폴더가 생성된다.
오늘은 해당 폴더 내 docker-compose-single-broker.yml 파일을 통해 kafka 토픽을 생성하고 메시지를 produce, consume 해보자.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
위와 같이 local 환경에서 kafka와 zookeeper를 구동하는 컴포즈 파일 설정을 마치고
clone 받은 kafka-docker 폴더에서
docker-compose -f docker-compose-single-broker.yml up -d
명령어를 실행해 docker 컨테이너를 구동한다.
이후 docker ps -a
명령어를 통해 컨테이너가 run중인지 확인하자.
잘 구동되고 있다.
구동까지 했다면 이제 토픽을 만들어보자.
docker exec -it kafka bash
위 명령어로 kafka 내부로 들어간 후
kafka-topics.sh --zookeeper localhost --create --topic 0406M --replication-factor 1 --partitions 1
이렇게 0406M 이라는 topic을 생성해준다.
Created topic {{topic name}}
메시지가 나오면 topic 생성 완료
dependencies {
...
implementation 'org.springframework.boot:spring-boot-starter-web'
// kafka
implementation 'org.springframework.kafka:spring-kafka'
...
}
server:
servlet:
context-path: /api
spring:
kafka:
// 로컬 환경
bootstrap-servers: localhost:9092
consumer:
# 식별 가능한 Consumer Group Id
group-id: test0406
# Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 존재하지 않을 경우 수행할 작업을 설정
# latest: 가장 최근에 생산된 메시지로 offeset reset
# earliest: 가장 오래된 메시지로 offeset reset
# none: offset 정보가 없으면 Exception 발생
auto-offset-reset: earliest
# 데이터를 받아올 때, key/value를 역직렬화
# JSON 데이터를 받아올 것이라면 JsonDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# producer bootstrap servers가 따로 존재하면 설정
# 데이터를 보낼 때, key/value를 직렬화
# JSON 데이터를 보낼 것이라면 JsonDeserializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
logging:
level:
org.hibernate.sql: debug
root: info
@RestController
@RequiredArgsConstructor
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducer producer;
@PostMapping("send")
public String sendMessage(@RequestParam("message") String message) {
producer.sendMessage(message);
return "success";
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private static final String TOPIC = "0406M";
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
log.info(String.format("Produce message: %s", message));
kafkaTemplate.send(TOPIC, message);
}
}
@Service
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "0406M", groupId = "test0406")
public void consume(String message) throws IOException {
log.info(String.format("Consumed message : %s", message));
}
}
0406M 이라는 topic을 미리 생성해놓았기 때문에 정상적으로 작동되는 것을 볼 수 있다.
만약 에러가 발생한다면 해당 topic이 존재하지 않아 발생하는 에러일 가능성이 크다.
postman을 통해 메시지를 produce 해보았다.
success
가 정상적으로 나온다.
springBoot로 돌아와보면
정상적으로 produce 되고
consume도 정상적으로 되는 것을 확인할 수 있다.
예전 프로젝트를 진행할 때 STOMP와 함께 kafka로 채팅 시스템을 개발한 적이 있었는데, 당시에 메시지 produce는 정상적으로 되지만 consume이 정상적으로 되지 않을 때 해당 부분에 대한 처리를 어떻게 해야할 지 몰라 많이 헤맸었던 기억이 난다. 이 부분에 대한 자료도 찾아보아야겠다.