https://developer.confluent.io/quickstart/kafka-docker/
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.2
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Confluent Platform KRaft all-in-one Docker Compose file:
https://docs.confluent.io/platform/current/platform-quickstart.html#ce-docker-quickstart --> https://github.com/confluentinc/cp-all-in-one/tree/7.4.0-post/cp-all-in-one-kraft/docker-compose.yml
$ docker-compose up
$ docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
--create \
--topic quickstart
$ npm install express
$ npm install kafkajs
$ docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
--create \
--topic quickstart-events
const express = require('express')
const app = express()
const port = 3000
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
const initKafka = async () => {
await producer.connect()
}
app.post('/events/:event', async (req, res) => {
await producer.send({
topic: 'quickstart-events',
messages: [
{ value: req.params.event },
],
})
res.send('successfully stored event : '+ req.params.event + '\n')
})
app.listen(port, async () => {
console.log(`kafka app listening on port ${port}`)
})
initKafka();
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const consumer = kafka.consumer({ groupId: 'test-group' })
const initKafka = async () => {
console.log('start subscribe')
await consumer.connect()
await consumer.subscribe({ topic: 'quickstart-events', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
}
initKafka()
$ curl -X POST localhost:3000/events/hello
successfully stored event : hello
$ curl -X POST localhost:3000/events/world
successfully stored event : world
참고)