version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
docker-compose up -d
터미널에서 도커 컨테이너 안으로 접속
docker exec -it kafka /bin/bash
설정 파일을 수정 - 내용을 추가
cd /opt/kafka/config
vi server.properties
listeners=PLAINTEXT://:9092
delete.topic.enable=true
auto.create.topics.enable=true
bash# cd /opt/kafka/bin
bash# kafka-topics.sh --bootstrap-server localhost:9092 --create --topic exam-topic --partitions 1 --replication-factor 1
bash# kafka-topics.sh --bootstrap-server localhost:9092 --list
bash# kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic exam-topic
docker exec -it kafka /bin/bashbash# cd /opt/kafka/bin
bash# kafka-console-producer.sh --topic exam-topic --broker-list localhost:9092
>메세지를 작성
docker exec -it kafka /bin/bashbash# cd /opt/kafka/bin
bash# ./kafka-console-consumer.sh --topic exam-topic --from-beginning --bootstrap-server localhost:9092
docker로 kafka를 만들었을 때 메시지 전송과 수신이 잘 수행되는 것을 알 수 있다.

python -m venv kafka_env
kafka_env\Scripts\activate
pip install kafka-python
pip install six==1.6.0
producer.py
import sys
import six
if sys.version_info >= (3, 12, 0):
sys.modules['kafka.vendor.six.moves'] = six.moves
from kafka import KafkaProducer
import json
class MessageProducer:
def __init__(self, broker, topic):
self.broker = broker
self.topic = topic
#key_serializer=str.encode 를 추가하면 key 와 함께 전송
#그렇지 않으면 value 만 전송
self.producer = KafkaProducer(
bootstrap_servers=self.broker,
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
acks=0,
api_version=(2, 5, 0),
key_serializer=str.encode,
retries=3,
)
def send_message(self, msg, auto_close=True):
try:
print(self.producer)
future = self.producer.send(self.topic, value=msg, key="key")
self.producer.flush() # 비우는 작업
if auto_close:
self.producer.close()
future.get(timeout=2)
return {"status_code": 200, "error": None}
except Exception as exc:
raise exc
# 브로커와 토픽명을 지정
broker = ["localhost:9092"]
topic = "exam-topic"
pd = MessageProducer(broker, topic)
#전송할 메시지 생성
msg = {"name": "John", "age": 30}
res = pd.send_message(msg)
print(res)
python producer.py
consumer.py
import sys
import six
if sys.version_info >= (3, 12, 0):
sys.modules['kafka.vendor.six.moves'] = six.moves
from kafka import KafkaConsumer
import json
class MessageConsumer:
def __init__(self, broker, topic):
self.broker = broker
self.consumer = KafkaConsumer(
topic, # Topic to consume
bootstrap_servers=self.broker,
value_deserializer=lambda x: x.decode(
"utf-8"
), # Decode message value as utf-8
group_id="my-group", # Consumer group ID
auto_offset_reset="earliest", # Start consuming from earliest available message
enable_auto_commit=True, # Commit offsets automatically
)
def receive_message(self):
try:
for message in self.consumer:
#print(message.value)
result = json.loads(message.value)
for k, v in result.items():
print(k, ":", result[k])
print(result["name"])
print(result["age"])
except Exception as exc:
raise exc
# 브로커와 토픽명을 지정한다.
broker = ["localhost:9092"]
topic = "exam-topic"
cs = MessageConsumer(broker, topic)
cs.receive_message()
python consumer.py

작성한 파일을 실행하면 에러가 발생: 이전에 JSON 형식이 아닌 데이터를 전송했는데 그 데이터를 파싱할려고 해서 에러가 발생
토픽을 삭제하고 다시 전송한 후 실행
# docker exec -it kafka /bin/bash bash-5.1# cd /opt/kafka/bin bash-5.1# kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic exam-topic