메세지 큐(Message Queue), Kafka

서유진·2022년 10월 7일
2

메세지 큐

메시지 큐(Message Queue)는 프로세스 또는 프로그램 간에 데이터를 교환할 때 사용하는 통신 방법 중에 하나로, 메시지 지향 미들웨어(Message Oriented Middleware:MOM)를 구현한 시스템을 의미

MOM(Message Oriented Middleware)
비동기 메세지를 사용하는 프로그램 간 데이터 송수신

특징

  • 비동기 : Queue에 넣기 때문에 나중에 처리 가능
  • 비동조 : Application과 분리 할 수 있다
  • 탄력성 : 일부가 실패해도 전체는 영향을 받지 않는다
  • 과잉 : 실패할 경우 재실행 가능
  • 확장성 : 다수의 프로세스들이 큐에 메시지를 보낼 수 있다.

언제 쓰나요?

  • 서로 다른 db를 사용하거나
  • 동시에 많은 양의 프로세스를 처리할 때
  • 서버 죽음(?)에 대비하기 위해,
  • 이미지, 비디오등 대용량 데이터 처리등 메모리를 많이 쓰는 작업

주문을 하고 결제를 할 때 시간이 과하게 오래 걸릴 때, 결제창으로 바로 넘어가고 주문한 데이터를 메세지 큐에 보관한다. 결제를 처리하고 나면 주문한 데이터를 창고에 전달하는 등 비동기적으로 프로그램을 다룰 때 쓰인다.

  • RabbitMQ, ActiveMQ, ZeroMQ, Kafka

pub/sub

publish/subscribe의 줄임말이며, 비동기식 메세징 패턴

이벤트(메시지)를 발행하는 Publisher가 존재하며,
Publisher는 특정 Channel(혹은 Topic)에 이벤트를 전송

특정 Channel(혹은 Topic)을 구독하는 Subscriber가 존재하며,
Publisher에 관계없이 발행된 이벤트를 수신

  • publisher(게시자)
    message를 생성해 topic에 전달하는 서버

  • message(메세지)
    publisher로부터 subscriber에게 최종적으로 전달되는 데이터와 속성(property)의 조합

  • topic(토픽)
    publisher가 메세지를 전달하는 리소스
  • subscription(구독)
    특정 단일 주제의 메시지 스트림이 구독 애플리케이션으로 전달되는 과정을 나타내는, 이름이 지정된 리소스
  • subscriber(구독자)
    메세지를 수신하는 서버

pub/sub 사이에서 중간다리 역할을 하는 브로커, 혹은 버스라고 불리는 존재가 추가적으로 존재

Kafka의 producer / consumer

publisher -> producer
subscribe -> consumer

  1. Producer는 Topic에 이벤트를 보낸다.

  2. 이 이벤트는 Topic의 각 Partition에 분산되어 저장된다.

  3. Topic을 구독하고 있는 Consumer group 내의 Consumer는 각각 1개 이상의 partition으로부터 이벤트를 가져온다.
    (항상 partition 수를 consumer보다 같거나 크게 해주는 것이 좋다.)

  • Consumer group 안의 Consumer 가 topic의 이벤트를 확인한다면 같은 그룹 내부의 타 Consumer는 확인할 수 없다 (한 이벤트에 대해 한 번의 기능만 작동된다.)

코드는 블로그 를 따라갔다.

// producer.js

const express = require('express')
const app = express()
const port = 3000

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app-pro',
  brokers: ['localhost:9092']
})

const producer = kafka.producer()

const initKafka = async () => {
  await producer.connect()
}

app.post('/events/:event', async (req, res) => {
  await producer.send({
    topic: 'quickstart-events',
    messages: [
      { value: req.params.event },
    ],
  })
  res.send('successfully stored event : '+ req.params.event + '\n')
})

app.listen(port, async  () => {
  console.log(`kafka app listening on port ${port}`)
})

initKafka().catch(e => console.error(`[example/consumer] ${e.message}`, e));

vscode에서 cmd를 실행한다.

	> node producer.js

실행하고 나면 3000포트에 서버가 켜지고, [post] /events/:event 를 postman등으로 메세지를 보내본다.

다음은 보낸 메세지를 받아보는 과정이다.

// consumer.js

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app-cus',
  brokers: ['localhost:9092']
})

const consumer = kafka.consumer({ groupId: 'test-group' })
const topic = 'quickstart-events'
const initKafka = async () => {
  console.log('start subscribe')
  await consumer.connect()
  await consumer.subscribe({ topic, fromBeginning: true })
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
        topic,
        value: message.value.toString(),
        })
  },
})
}

initKafka().catch(e => console.error(`[example/consumer] ${e.message}`, e))

vscode에서 cmd를 하나 더 켜서 consumer를 실행한다.

	> node consumer.js

producer에서 보낸 메세지가 consumer에 도착해 있는 것을 확인할 수 있다.

profile
Backend Dev.

1개의 댓글

comment-user-thumbnail
2022년 10월 16일

전에 기술 블로그를 구경하다가 docker와 spring을 이용한 Kafka 구성하는 걸 봤을 땐 좀 어렵게 다가왔는데, 이 글을 읽으면서 개념부터 사용방향, 구성 과정까지 한 번에 볼 수 있어서 이해하기에 수월했던 것 같습니다! 글 잘 봤습니다ㅏ!!

답글 달기