[kafka] kafkaConsumer활용 Docker kafka broker와 local consumer 연결

songmoana·2024년 2월 23일
0

kafka

목록 보기
1/3

이전 상황 : Docker Container로 띄운 Kafka에 Mongodb-source-connector를 활용하여 Tutorial1.orders Topic에 Mongodb 변경 값을 Publish 함.

<이전 상황 글 링크> - Publish까지 완료

목표 : Local 환경에서 python KafkaConsumer 라이브러리를 활용하여 Consumer 설정하기


  1. Source connector 까지 연결하고 Change stream 설정까지 마무리한 kafka 준비
# 이 단계까지 마무리
curl -X POST -H "Content-Type: application/json" -d @/simple_source.json http://localhost:8083/connectors
  1. consumer 역할을 할 python 코드 작성
from kafka import KafkaConsumer

# broker에 연결
consumer = KafkaConsumer(
    "Tutorial1.orders", # 읽어올 토픽명
    bootstrap_servers= "localhost:9092", 
    auto_offset_reset="earliest", 
    enable_auto_commit=True,
    group_id="test_group"
)


# message 값 프린트
for message in consumer:
    print(message.value)

* bootstrap-servers : kafka broker 서버 주소
* auto_offset_reset : 컨슈머가 초기 offset(읽어야 할 메시지 위치)을 어떻게 설정할지
	* earliest : 가장 이른 시점의 오프셋부터
    * latest : 가장 최근의 오프셋부터
* enable_auto_commit : 컨슈머가 자동으로 offset을 commit 할지 결정
* group_id : 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자

~결과 확인~

* Tutorial.orders 컬렉션의 변경된 값이 terminal창에 print찍힘!


Docker Kafka Broker PORT 관련

  • 튜토리얼의 docker-compose 파일에는 따로 broker의 port가 맵핑된 곳이 없어서 직접 port 맵핑 처리
  broker:
    image: confluentinc/cp-kafka:7.2.2
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092" # 이 부분이 직접 추가한 포트 맵핑
    depends_on:
      - zookeeper
    networks:
      - localnet
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENERS: LISTENER_1://broker:29092,LISTENER_2://broker:9092,
      KAFKA_ADVERTISED_LISTENERS: LISTENER_1://broker:29092,LISTENER_2://localhost:9092 
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_1:PLAINTEXT,LISTENER_2:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
      KAFKA_DELETE_TOPIC_ENABLE: "true"
  • KAFKA_ADVERTISED_LISTENERS : 외부 client와 broker를 연결해주는 부분
    -> docker 내부에서는(e.g. connector 컨테이너) LISTENER_1 (host name : broker)
    -> docker 외부에서는(e.g. local computer) LISTENER_2 (host name : localhost)
    ** 따라서 내부에서 요청할 때는 29092 포트로, 외부에서는 9092 포트로 연결하면 정상적으로 consumer 연결을 할 수 있다.
profile
옹모아나 - 개발백과

0개의 댓글