이전 상황 : Docker Container로 띄운 Kafka에 Mongodb-source-connector를 활용하여 Tutorial1.orders Topic에 Mongodb 변경 값을 Publish 함.
<이전 상황 글 링크> - Publish까지 완료
목표 : Local 환경에서 python KafkaConsumer 라이브러리를 활용하여 Consumer 설정하기
- Source connector 까지 연결하고 Change stream 설정까지 마무리한 kafka 준비
# 이 단계까지 마무리
curl -X POST -H "Content-Type: application/json" -d @/simple_source.json http://localhost:8083/connectors
- consumer 역할을 할 python 코드 작성
from kafka import KafkaConsumer
# broker에 연결
consumer = KafkaConsumer(
"Tutorial1.orders", # 읽어올 토픽명
bootstrap_servers= "localhost:9092",
auto_offset_reset="earliest",
enable_auto_commit=True,
group_id="test_group"
)
# message 값 프린트
for message in consumer:
print(message.value)
* bootstrap-servers : kafka broker 서버 주소
* auto_offset_reset : 컨슈머가 초기 offset(읽어야 할 메시지 위치)을 어떻게 설정할지
* earliest : 가장 이른 시점의 오프셋부터
* latest : 가장 최근의 오프셋부터
* enable_auto_commit : 컨슈머가 자동으로 offset을 commit 할지 결정
* group_id : 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자
~결과 확인~
* Tutorial.orders 컬렉션의 변경된 값이 terminal창에 print찍힘!
Docker Kafka Broker PORT 관련
- 튜토리얼의 docker-compose 파일에는 따로 broker의 port가 맵핑된 곳이 없어서 직접 port 맵핑 처리
broker:
image: confluentinc/cp-kafka:7.2.2
hostname: broker
container_name: broker
ports:
- "9092:9092" # 이 부분이 직접 추가한 포트 맵핑
depends_on:
- zookeeper
networks:
- localnet
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENERS: LISTENER_1://broker:29092,LISTENER_2://broker:9092,
KAFKA_ADVERTISED_LISTENERS: LISTENER_1://broker:29092,LISTENER_2://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_1:PLAINTEXT,LISTENER_2:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
KAFKA_DELETE_TOPIC_ENABLE: "true"
- KAFKA_ADVERTISED_LISTENERS : 외부 client와 broker를 연결해주는 부분
-> docker 내부에서는(e.g. connector 컨테이너) LISTENER_1 (host name : broker)
-> docker 외부에서는(e.g. local computer) LISTENER_2 (host name : localhost)
** 따라서 내부에서 요청할 때는 29092 포트로, 외부에서는 9092 포트로 연결하면 정상적으로 consumer 연결을 할 수 있다.