


version: "2"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock

클러스터 1개를 가진 카프카 서버를 실행
docker-compose up -d

컨테이너 실행 확인
docker ps

터미널에서 도커 컨테이너 안으로 접속
docker exec -it kafka /bin/bash

설정 파일을 수정
vi /opt/kafka/config/server.properties
# 주석 해제
listeners=PLAINTEXT://:9092
# 추가
delete.topic.enable=true
auto.create.topics.enable=true
외부에서 접속가능하도록 할 때 추가
advertised.listeners=PLAINTEXT://공인ip:9092
명령어를 사용하기 위해서 프롬프트 이동
cd /opt/kafka/bin
첫번째 카프카 서버의 첫번째 영역에 토픽(exam-topic) 생성
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic exam-topic

토픽 리스트 조회
kafka-topics.sh --bootstrap-server localhost:9092 --list

kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic exam-topic
cd /opt/kafka/bin
kafka-console-producer.sh --topic exam-topic --broker-list localhost:9092안녕하세요
반갑습니다
docker exec -it kafka /bin/bashcd /opt/kafka/binkafka-console-consumer.sh --bootstrap-server localhost:9092 --topic exam-topic --from-beginning
가상 환경을 생성 (Mac 이나 Linux는 pythone 대신에 python3)
python -m venv kafka_env
가상환경 활성화
Mac 이나 Linux:
source kafka_env/bin/activate
Windows:
kafka_env\Scripts\activate
패키지 설치
pip install kafka-python
pip install six==1.6.0

메시지 전송하는 python 코드 작성 - producer.py
import sys
import six
if sys.version_info >= (3, 12, 0):
sys.modules['kafka.vendor.six.moves'] = six.moves
from kafka import KafkaProducer
import json
class MessageProducer:
def __init__(self, broker, topic):
self.broker = broker
self.topic = topic
#key_serializer=str.encode 를 추가하면 key 와 함께 전송
#그렇지 않으면 value 만 전송
self.producer = KafkaProducer(
bootstrap_servers=self.broker,
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
acks=0,
api_version=(2, 5, 0),
key_serializer=str.encode,
retries=3,
)
def send_message(self, msg, auto_close=True):
try:
print(self.producer)
future = self.producer.send(self.topic, value=msg, key="key")
self.producer.flush() # 비우는 작업
if auto_close:
self.producer.close()
future.get(timeout=2)
return {"status_code": 200, "error": None}
except Exception as exc:
raise exc
# 브로커와 토픽명을 지정
broker = ["localhost:9092"]
topic = "exam-topic"
pd = MessageProducer(broker, topic)
#전송할 메시지 생성
msg = {"name": "John", "age": 30}
res = pd.send_message(msg)
print(res)
메시지 전송
python producer.py

실행 한 후 터미널의 카프카 컨슈머가 데이터를 받는지 확인

메시지 전송하는 python 코드 작성 - consumer.py
import sys
import six
if sys.version_info >= (3, 12, 0):
sys.modules['kafka.vendor.six.moves'] = six.moves
from kafka import KafkaConsumer
import json
class MessageConsumer:
def __init__(self, broker, topic):
self.broker = broker
self.consumer = KafkaConsumer(
topic, # Topic to consume
bootstrap_servers=self.broker,
value_deserializer=lambda x: x.decode(
"utf-8"
), # Decode message value as utf-8
group_id="my-group", # Consumer group ID
auto_offset_reset="earliest", # Start consuming from earliest available message
enable_auto_commit=True, # Commit offsets automatically
)
def receive_message(self):
try:
for message in self.consumer:
#print(message.value)
result = json.loads(message.value)
for k, v in result.items():
print(k, ":", result[k])
print(result["name"])
print(result["age"])
except Exception as exc:
raise exc
# 브로커와 토픽명을 지정한다.
broker = ["localhost:9092"]
topic = "exam-topic"
cs = MessageConsumer(broker, topic)
cs.receive_message()
작성한 파일을 실행하면 에러가 발생
: 이전에 JSON 형식이 아닌 데이터를 전송했는데 그 데이터를 파싱할려고 해서 에러가 발생
docker exec -it kafka /bin/bash
cd /opt/kafka/bin
kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic exam-topic 