Kafka - Quick Start

오픈소스·2023년 5월 7일
0
post-thumbnail

https://developer.confluent.io/quickstart/kafka-docker/

docker-compose.yml

---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.2
    container_name: broker
    ports:
    # To learn about configuring Kafka for access across networks see
    # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

Confluent Platform KRaft all-in-one Docker Compose file:
https://docs.confluent.io/platform/current/platform-quickstart.html#ce-docker-quickstart --> https://github.com/confluentinc/cp-all-in-one/tree/7.4.0-post/cp-all-in-one-kraft/docker-compose.yml

$ docker-compose up

Create a topic

$ docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
             --create \
             --topic quickstart

Write messages to the topic / Read messages from the topic


https://gihyun.com/156

$ npm install express
$ npm install kafkajs

$ docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
             --create \
             --topic quickstart-events

express producer

const express = require('express')
const app = express()
const port = 3000

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const producer = kafka.producer()

const initKafka = async () => {
  await producer.connect()
}

app.post('/events/:event', async (req, res) => {
  await producer.send({
    topic: 'quickstart-events',
    messages: [
      { value: req.params.event },
    ],
  })
  res.send('successfully stored event : '+ req.params.event + '\n')
})

app.listen(port, async  () => {
  console.log(`kafka app listening on port ${port}`)
})

initKafka();

express consumer

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const consumer = kafka.consumer({ groupId: 'test-group' })

const initKafka = async () => {
  console.log('start subscribe')
  await consumer.connect()
  await consumer.subscribe({ topic: 'quickstart-events', fromBeginning: true })
  await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      value: message.value.toString(),
    })
  },
})
}

initKafka()
$ curl -X POST localhost:3000/events/hello
successfully stored event : hello
$ curl -X POST localhost:3000/events/world
successfully stored event : world

참고)

0개의 댓글