[Spring] -๐Ÿ“š Kafka ๊ฐœ๋… ๋ฐ ์ฑ„ํŒ… ์‹œ์Šคํ…œ Kafka ๋„์ž…๊ธฐ(feat: STOMP)

CodeByHanยท2025๋…„ 6์›” 17์ผ

์Šคํ”„๋ง

๋ชฉ๋ก ๋ณด๊ธฐ
27/33

์‚ฌ์ด๋“œ ํ”„๋กœ์ ํŠธ๋กœ STOMP ๊ธฐ๋ฐ˜ ์ฑ„ํŒ… ์‹œ์Šคํ…œ์„ ๊ฐœ๋ฐœํ–ˆ๋‹ค.
๋จผ์ € ๋ฉ”์ธ ์„œ๋ฒ„์™€ ์ฑ„ํŒ… ์„œ๋ฒ„๋ฅผ ๋ถ„๋ฆฌํ•˜์—ฌ ๊ตฌ์ถ•ํ–ˆ๋Š”๋ฐ, ์‹ค์ œ ์šด์˜ ํ™˜๊ฒฝ์—์„œ ์ฑ„ํŒ… ์„œ๋ฒ„๋ฅผ ์Šค์ผ€์ผ ์•„์›ƒ(Scale Out)ํ•ด์•ผ ํ•  ๊ฒฝ์šฐ์—๋„ ๋ฉ”์‹œ์ง€๊ฐ€ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ „๋‹ฌ๋˜๋Š”์ง€ ๊ถ๊ธˆํ–ˆ๋‹ค.
ํ…Œ์ŠคํŠธํ•ด ๋ณธ ๊ฒฐ๊ณผ, ๋ถ„์‚ฐ๋œ ์ฑ„ํŒ… ์„œ๋ฒ„ ๊ฐ„์—๋Š” ๋ฉ”์‹œ์ง€ ๋™๊ธฐํ™”๊ฐ€ ์ด๋ฃจ์–ด์ง€์ง€ ์•Š์•„ ๋‹ค๋ฅธ ์‚ฌ์šฉ์ž์—๊ฒŒ ์ฑ„ํŒ…์ด ํ‘œ์‹œ๋˜์ง€ ์•Š๋Š” ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ–ˆ๋‹ค.

๊ทธ ์ด์œ ๋Š” ์ธ๋ฉ”๋ชจ๋ฆฌ(Simple) ๋ธŒ๋กœ์ปค ์‚ฌ์šฉํ–ˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.
Spring WebSocket์˜ ๊ธฐ๋ณธ STOMP ๋ธŒ๋กœ์ปค(enableSimpleBroker)๋Š” ์„œ๋ฒ„ ์ธ์Šคํ„ด์Šค ๋‚ด ๋ฉ”๋ชจ๋ฆฌ ์œ„์—์„œ๋งŒ ๋™์ž‘ํ•œ๋‹ค. ๋”ฐ๋ผ์„œ ํ•œ ๋…ธ๋“œ์—์„œ ๋ฐ›์€ ๋ฉ”์‹œ์ง€๋Š” ๊ทธ ๋…ธ๋“œ์˜ ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ๋…์ž์—๊ฒŒ๋งŒ ์ „๋‹ฌ๋˜๊ณ  ๋‹ค๋ฅธ ๋…ธ๋“œ์— ์—ฐ๊ฒฐ๋œ ํด๋ผ์ด์–ธํŠธ๋Š” ์ „ํ˜€ ๋ชจ๋ฅด๊ฒŒ ๋œ๋‹ค.

๋”ฐ๋ผ์„œ ์™ธ๋ถ€ ๋ธŒ๋กœ์ปค๋ฅผ ์‚ฌ์šฉํ•ด์•ผํ–ˆ๋Š”๋ฐ ๋‚˜๋Š” Kafka๋ฅผ ๋„์ž…ํ•ด๋ณด๊ธฐ๋กœ ๋งˆ์Œ ๋จน์—ˆ๋‹ค.

์ฑ„ํŒ… ์‹œ์Šคํ…œ์—์„œ Kafka๋ฅผ ์„ ํƒํ•œ ์ด์œ 

Kafka, RabbitMQ, Redis Pub/Sub ์ค‘์— ์ง„์งœ ์˜ค๋žœ์‹œ๊ฐ„ ๊ณ ๋ฏผ์„ ํ–ˆ์—ˆ๋‹ค. ์ฒ˜์Œ์—๋Š” RabbitMQ๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๊ณ  ํ–ˆ์ง€๋งŒ ์‹œ๊ฐ„์ด ์ง€๋‚ ์ˆ˜๋ก Kafka๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ๋กœ ๋งˆ์Œ ๋จน์—ˆ๋‹ค. ์ฒ˜์Œ์— RabbitMQ๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๊ณ  ํ•œ ๋ชฉ์ ์€ ๋‚ฎ์€ ์ฒ˜๋ฆฌ๋Ÿ‰๊ณผ ์งง์€ ์ง€์—ฐ์‹œ๊ฐ„์ด๋ผ๋Š” ์žฅ์ ์„ ๊ฐ€์ง€๊ณ  ์žˆ์—ˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

Redis Pub/Sub๋Š” ๋ฉ”๋ชจ๋ฆฌ ๊ธฐ๋ฐ˜์œผ๋กœ ๋น ๋ฅธ ์„ฑ๋Šฅ์„ ์ œ๊ณตํ•˜์ง€๋งŒ, ๋ฉ”์‹œ์ง€ ์œ ์‹ค ๊ฐ€๋Šฅ์„ฑ์ด ๋„ˆ๋ฌด ์ปธ๊ธฐ ๋•Œ๋ฌธ์— ์ฒ˜์Œ๋ถ€ํ„ฐ ํ›„๋ณด์—์„œ ์ œ์™ธํ–ˆ๋‹ค. ์ด๋Š” ํ™”์ƒ ํšŒ์˜๋‚˜ ์ŠคํŠธ๋ฆฌ๋ฐ ๊ฐ™์€ ์šฉ๋„์—๋Š” ์ ํ•ฉํ•˜๊ฒ ์ง€๋งŒ, Slack์ด๋‚˜ ์นด์นด์˜คํ†ก ๊ฐ™์€ ์ฑ„ํŒ… ์‹œ์Šคํ…œ์—์„œ๋Š” ์‚ฌ์šฉ์ž๊ฐ€ ์ž ์‹œ๋ผ๋„ ์—ฐ๊ฒฐ์ด ๋Š๊ฒผ์„ ๋•Œ ๋†“์นœ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณต๊ตฌํ•  ์ˆ˜ ์—†๋Š” ์น˜๋ช…์ ์ธ ๋‹จ์ ์ด ์กด์žฌํ–ˆ๋‹ค.

RabbitMQ๋Š” ์„ค์ •์ด ๊ฐ„ํŽธํ•˜๊ณ  AMQP ํ”„๋กœํ† ์ฝœ ๋•๋ถ„์— ๋น ๋ฅด๊ฒŒ ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ๋””์Šคํฌ ์“ฐ๊ธฐ ์—†์ด ๋ฉ”๋ชจ๋ฆฌ๋งŒ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ฃผ๊ณ ๋ฐ›์„ ๋•Œ ์ง€์—ฐ์‹œ๊ฐ„์ด ๋งค์šฐ ์งง๋‹ค๋Š” ์žฅ์ ์ด ์žˆ์—ˆ๋‹ค. ์‚ฌ์ด๋“œ ํ”„๋กœ์ ํŠธ๋ผ ์œ ์ €๊ฐ€ ๋ช‡๋ฐฑ ๋ช… ์ˆ˜์ค€์ผ ๊ฒƒ์ด๋ผ ์˜ˆ์ƒํ•ด โ€œ๊ณผ๋„ํ•œ ํŠธ๋ž˜ํ”ฝ์ด๋‚˜ ๋ณต์žกํ•œ ํŒŒํ‹ฐ์…”๋‹ ์—†์ด๋„ ์ถฉ๋ถ„ํžˆ ์ปค๋ฒ„ํ•  ์ˆ˜ ์žˆ๊ฒ ๋‹คโ€๊ณ  ํŒ๋‹จํ–ˆ์ง€๋งŒ, ์‹ค์ œ๋กœ ๋Œ€์šฉ๋Ÿ‰ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์‹œ ์ฒ˜๋ฆฌ๋Ÿ‰์˜ ํ•œ๊ณ„๊ฐ€ ๋ช…ํ™•ํ•˜๋‹ค๋Š” ๊ฒƒ์„ ์•Œ๊ฒŒ๋˜์—ˆ๋‹ค. k6๋กœ ๋ถ€ํ•˜๋ฅผ ๊ฑธ์–ด๋ณด๋ฉด ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•  ๊ฒƒ์ด ๋ถ„๋ช…ํ–ˆ๊ณ , ๋ฉ”์‹œ์ง€๊ฐ€ ๊ณผ๋„ํ•˜๊ฒŒ ์Œ“์ผ ๊ฒฝ์šฐ ๋ฉ”๋ชจ๋ฆฌ ๋ถ€์กฑ์œผ๋กœ ์‹œ์Šคํ…œ ์ „์ฒด ์•ˆ์ •์„ฑ์ด ์œ„ํ˜‘๋ฐ›๋Š” ๋ฌธ์ œ๋„ ์šฐ๋ ค๋˜์—ˆ๋‹ค.

๋”ฐ๋ผ์„œ ๋†’์€ ์ฒ˜๋ฆฌ๋Ÿ‰, ๋ฉ”์‹œ์ง€ ์œ ์‹ค ๋ฐฉ์ง€, ์•ˆ์ •์ ์ธ ์Šค์ผ€์ผ ์•„์›ƒ์„ ๋ชจ๋‘ ์ถฉ์กฑํ•˜๋Š” Kafka๋ฅผ ๋„์ž…ํ•˜๊ฒŒ ๋˜์—ˆ๋‹ค.

๐Ÿ“Œ ์นดํ”„์นด(Kafka)

์นดํ”„์นด(Kafka)๋Š” ๋ถ„์‚ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ํ”Œ๋žซํผ์œผ๋กœ, ๋Œ€๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ณ  ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ „์†กํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋œ๋‹ค. ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋Š” ๋กœ๊ทธ ํ˜•์‹์œผ๋กœ ํŒŒ์ผ ์‹œ์Šคํ…œ์— ๊ธฐ๋ก๋œ๋‹ค. ์—ฌ๊ธฐ์„œ ๋งํ•˜๋Š” ๋กœ๊ทธ๋Š” ์ถ”๊ฐ€๋งŒ ๊ฐ€๋Šฅํ•˜๋ฉฐ, ์‹œ๊ฐ„์ˆœ์œผ๋กœ ์™„์ „ํžˆ ์ •๋ ฌ๋œ ๋ฐ์ดํ„ฐ์˜ ํ๋ฆ„(๋ ˆ์ฝ”๋“œ ์‹œํ€€์Šค)์„ ์˜๋ฏธํ•œ๋‹ค. ๋กœ๊ทธ๋ฅผ ํ•œ๊ณณ์— ๋ชจ์•„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ์ค‘์•™์ง‘์ค‘ํ™”๋˜์–ด ์žˆ์œผ๋ฉฐ, ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๊ณ  ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ์œผ๋กœ ์†Œ๋น„๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

์šฐ์„  ๋ฉ”์„ธ์ง€/์ด๋ฒคํŠธ ๋ธŒ๋กœ์ปค์™€ ๋ฉ”์„ธ์ง€ ํ๋ฅผ ์•„๋Š”๊ฒŒ ๋” ์ข‹๋‹ค๊ณ  ํŒ๋‹จํ•ด์„œ ์ •๋ฆฌ ํ• ๋ ค๊ณ  ํ•œ๋‹ค.

๋ฉ”์„ธ์ง€ ํ (Message Queue, MQ)

๋ฉ”์‹œ์ง€ ํ(MQ)๋Š” ๋ฉ”์‹œ์ง€ ์ง€ํ–ฅ ๋ฏธ๋“ค์›จ์–ด(MOM : Message Oriented Middleware)๋ฅผ ๊ตฌํ˜„ํ•œ ์‹œ์Šคํ…œ์œผ๋กœ ํ”„๋กœ๊ทธ๋žจ(ํ”„๋กœ์„ธ์Šค) ๊ฐ„์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๊ตํ™˜ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ๊ธฐ์ˆ ์„ ์˜๋ฏธํ•œ๋‹ค.

  • Producer: ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ์ฃผ์ฒด

  • Consumer: ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›๋Š” ์ฃผ์ฒด

  • Queue: ๋ฉ”์‹œ์ง€๋ฅผ ์ฐจ๋ก€๋กœ ๋ณด๊ด€ํ–ˆ๋‹ค๊ฐ€ ์ „๋‹ฌํ•˜๋Š” ๋Œ€๊ธฐ์ค„

๐Ÿค” ๋ฉ”์„ธ์ง€ ์ง€ํ–ฅ ๋ฏธ๋“ค์›จ์–ด(MOM)??

  • ์‘์šฉ ์†Œํ”„ํŠธ์›จ์–ด ๊ฐ„์˜ ๋น„๋™๊ธฐ์  ๋ฐ์ดํ„ฐ ํ†ต์‹ ์„ ์œ„ํ•œ ์†Œํ”„ํŠธ์›จ์–ด
  • ์ƒ์‚ฐ์ž(producer)๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๊ณ , ์†Œ๋น„์ž(consumer)๋Š” ํ•„์š”ํ•  ๋•Œ ๊บผ๋‚ด ์ฒ˜๋ฆฌ
  • ์ฆ‰, ๋น„๋™๊ธฐ์ (Asynchronous) ๋ฐฉ์‹์„ ์ด์šฉํ•ด์„œ ํ”„๋กœ์„ธ์Šค๊ฐ„์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ  ๋ฐ›๋Š” ๊ธฐ๋Šฅ์„ ์œ„ํ•œ ์‹œ์Šคํ…œ

๋ฉ”์„ธ์ง€ ๋ธŒ๋กœ์ปค(Message Broker) / ์ด๋ฒคํŠธ ๋ธŒ๋กœ์ปค(Event Broker)

๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค(Message Broker)

์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ฐ„์— โ€œ๋ฉ”์‹œ์ง€โ€๋ฅผ ์•ˆ์ „ํ•˜๊ฒŒ ์ „๋‹ฌํ•ด ์ฃผ๋Š” ์ค‘๊ฐœ์ž
๋ฉ”์„ธ์ง€ ์ง€ํ–ฅ ๋ฏธ๋“ค์›Œ์–ด(Message-Oriented Middleware)๋ฅผ ๊ตฌํ˜„ํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋˜๋Š” ์‹œ์Šคํ…œ

  • ์ƒ์‚ฐ์ž(producer)๊ฐ€ ๋ณด๋‚ธ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์•„์„œ
  • ํ(Queue)๋‚˜ ํ† ํ”ฝ(Topic)์— ์ €์žฅํ•ด ๋‘์—ˆ๋‹ค๊ฐ€
  • ์†Œ๋น„์ž(consumer)์—๊ฒŒ ์ „๋‹ฌํ•ด ์ค€๋‹ค.
    โ†’ ์ฃผ๋กœ ๋น„๋™๊ธฐ ์ž‘์—… ๋ถ„์‚ฐ, ์žฅ์• ๋‚˜ ํŠธ๋ž˜ํ”ฝ ๊ธ‰์ฆ ์‹œ์—๋„ ์‹ ๋ขฐ์„ฑ ์žˆ๊ฒŒ ์ฒ˜๋ฆฌํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.
    -> ์ด๋Ÿฌํ•œ ๊ตฌ์กฐ๋ฅผ ๋ณดํ†ต pub/sub๊ตฌ์กฐ๋ผ๊ณ  ํ•˜๋ฉฐ ๋Œ€ํ‘œ์ ์œผ๋กœ๋Š” Redis, RabbitMQ๊ฐ€ ์žˆ๋‹ค.

์ด๋ฒคํŠธ ๋ธŒ๋กœ์ปค(Event Broker)

์‹œ์Šคํ…œ์—์„œ ๋ฐœ์ƒํ•œ โ€œ์ด๋ฒคํŠธ(์‚ฌ๊ฑด)โ€๋ฅผ ๋ฐœํ–‰(publish)ํ•˜๊ณ ,
๊ด€์‹ฌ ์žˆ๋Š” ์†Œ๋น„์ž(Subscriber)์—๊ฒŒ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ „ํŒŒํ•ด ์ฃผ๋Š” ์ค‘๊ฐœ์ž

  • ์ด๋ฒคํŠธ๋Š” ๊ณผ๊ฑฐ ํžˆ์Šคํ† ๋ฆฌ๋ฅผ ๋กœ๊ทธ๋กœ ๋‚จ๊ฒจ ์žฌ์ƒ(replay)๋„ ๊ฐ€๋Šฅ
  • ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆฌ๋ฐ(Event Streaming) ํ”Œ๋žซํผ(์˜ˆ: Kafka, Pulsar) ํ˜•ํƒœ๋กœ ๋งŽ์ด ๊ตฌํ˜„
    โ†’ ๋งˆ์ดํฌ๋กœ์„œ๋น„์Šค๋‚˜ ์ด๋ฒคํŠธ ๊ธฐ๋ฐ˜ ์•„ํ‚คํ…์ฒ˜์—์„œ ์„œ๋น„์Šค ๊ฐ„ ๋А์Šจํ•œ ๊ฒฐํ•ฉ๊ณผ ์‹ค์‹œ๊ฐ„ ๋ฐ˜์‘์„ ์œ„ํ•ด ์‚ฌ์šฉํ•œ๋‹ค.

๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๋Š” ์ด๋ฒคํŠธ ๋ธŒ๋กœ์ปค๊ฐ€ ๋  ์ˆ˜ ์—†์ง€๋งŒ, ์ด๋ฒคํŠธ ๋ธŒ๋กœ์ปค๋Š” ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค ์—ญํ• ์„ ํ•  ์ˆ˜ ์žˆ๋‹ค.

ํŠน์„ฑ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์ด๋ฒคํŠธ ๋ธŒ๋กœ์ปค
๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ํ›„ ์‚ญ์ œ์ด๋ฒคํŠธ ์˜๊ตฌ ์ €์žฅ ๊ฐ€๋Šฅ
์žฌ์ฒ˜๋ฆฌ์ œํ•œ์ ์ด๋ฒคํŠธ ์žฌ์ƒ ๊ฐ€๋Šฅ
ํ™•์žฅ์„ฑ์ƒ๋Œ€์ ์œผ๋กœ ์ œํ•œ์ ๋†’์€ ํ™•์žฅ์„ฑ
์šฉ๋„๋น„๋™๊ธฐ ์ž‘์—… ์ฒ˜๋ฆฌ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ
๊ฒฐํ•ฉ๋„์ƒ๋Œ€์ ์œผ๋กœ ๋†’์Œ๋‚ฎ์€ ๊ฒฐํ•ฉ๋„

Pub/Sub ๊ตฌ์กฐ

  1. ์ด๋ฒคํŠธ(๋ฉ”์‹œ์ง€)๋ฅผ ๋ฐœํ–‰ํ•˜๋Š” Publisher๊ฐ€ ์กด์žฌํ•˜๋ฉฐ, Publisher๋Š” ํŠน์ • Channel(ํ˜น์€ Topic)์— ์ด๋ฒคํŠธ๋ฅผ ์ „์†กํ•œ๋‹ค.
  2. ํŠน์ • Channel(ํ˜น์€ Topic)์„ ๊ตฌ๋…ํ•˜๋Š” Subscriber๊ฐ€ ์กด์žฌํ•˜๋ฉฐ, Publisher์— ๊ด€๊ณ„์—†์ด ๋ฐœํ–‰๋œ ์ด๋ฒคํŠธ๋ฅผ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.

์นดํ”„์นด๋ฅผ ๊ตฌ์„ฑํ•˜๋Š” ์ฃผ์š” ์š”์†Œ

  • ์ด๋ฒคํŠธ(Event) : Kafka ์„ ํ†ตํ•ด Producer ์™€ Consumer ๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ ๋ฐ›๋Š” ๋‹จ์œ„, ๋ฉ”์„ธ์ง€

  • ์นดํ”„์นด ํด๋Ÿฌ์Šคํ„ฐ(Kafka Cluster): Kafka Broker ๋“ค์˜ ๋ชจ์ž„, Kafka ๋Š” ํ™•์žฅ์„ฑ๊ณผ ๊ณ ๊ฐ€์šฉ์„ฑ์„ ์œ„ํ•˜์—ฌ Broker ๋“ค์ด ํด๋Ÿฌ์Šคํ„ฐ๋กœ ๊ตฌ์„ฑ
    -> ์—ฌ๋Ÿฌ๊ฐœ์˜ Kafka Broker ๋กœ ์šด์˜์„ ํ•˜๋ฉด ๋™์‹œ์— ๋” ๋งŽ์€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ํ•„์š”์— ๋”ฐ๋ผ Broker ์„ ์ถ”๊ฐ€ ๊ฐ€๋Šฅ
    -> ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด ํ•œ Broker์— ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•ด๋„ ๋‹ค๋ฅธ Broker๊ฐ€ ํŒŒํ‹ฐ์…˜ ๋ฆฌ๋”๋ฅผ ์ž๋™์œผ๋กœ ์Šน๊ฒฉํ•ด ๋ฌด์ค‘๋‹จ ์šด์˜์ด ๊ฐ€๋Šฅ

  • ๋ธŒ๋กœ์ปค(Broker) : ๊ฐ๊ฐ์˜ Kafka ์„œ๋ฒ„, ๋™์ผ ๋…ธ๋“œ์— ์—ฌ๋Ÿฌ ๋ธŒ๋กœ์ปค ๊ฐ€๋Šฅ
    -> ์‹คํ–‰๋œ Kafka ์„œ๋ฒ„๋ฅผ ๋งํ•˜๋ฉฐ, Kafka ๊ทธ ์ž์ฒด

  • ํ”„๋กœ๋“€์„œ(Producer): Kafka์— ์ด๋ฒคํŠธ(๋ฉ”์‹œ์ง€)๋ฅผ ๋ฐœํ–‰(Publish)ํ•˜๋Š” ์—ญํ• ์„ ํ•˜๋Š” ํด๋ผ์ด์–ธํŠธ

  • ์ปจ์Šˆ๋จธ(Consumer): ํŠน์ • ํ† ํ”ฝ(Topic)์„ ๊ตฌ๋…(Subscribe)ํ•˜๊ณ , ๊ทธ ์•ˆ์— ๋‹ด๊ธด ์ด๋ฒคํŠธ๋ฅผ ๋ฐ›์•„ ์ฒ˜๋ฆฌํ•˜๋Š” ํด๋ผ์ด์–ธํŠธ

  • ํ† ํ”ฝ(Topic) : Kafka์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ถ„๋ฅ˜ํ•˜๊ณ  ์ €์žฅํ•˜๋Š” ๋…ผ๋ฆฌ์ ์ธ ๊ณต๊ฐ„, ํŒŒ์ผ์‹œ์Šคํ…œ์˜ ํด๋”์™€ ์œ ์‚ฌ

  • ํŒŒํ‹ฐ์…˜(Partition) : Topic์€ ์—ฌ๋Ÿฌ Broker์— ๋ถ„์‚ฐ๋˜์–ด ์ €์žฅ๋˜๋ฉฐ, ์ด๋ ‡๊ฒŒ ๋ถ„์‚ฐ๋œ topic์„ partition์ด๋ผ๊ณ  ํ•จ
    -> ํ† ํ”ฝ์„ ์—ฌ๋Ÿฌ ๊ฐœ๋กœ ๋‚˜๋ˆˆ ์ž‘์€ ๋‹จ์œ„

  • ์˜คํ”„์…‹(offset) : ์ปจ์Šˆ๋จธ์—์„œ ๋ฉ”์„ธ์ง€๋ฅผ ์–ด๋””๊นŒ์ง€ ์ฝ์—ˆ๋Š”์ง€ ์ €์žฅํ•˜๋Š” ๊ฐ’
    -> ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์˜ ์ปจ์Šˆ๋จธ๋“ค์€ ๊ฐ๊ฐ์˜ ํŒŒํ‹ฐ์…˜์— ์ž์‹ ์ด ๊ฐ€์ ธ๊ฐ„ ๋ฉ”์‹œ์ง€์˜ ์œ„์น˜ ์ •๋ณด(offset) ์„ ๊ธฐ๋ก
    -> ์ปจ์Šˆ๋จธ ์žฅ์•  ๋ฐœ์ƒ ํ›„ ๋‹ค์‹œ ์‚ด์•„๋‚˜๋„, ์ „์— ๋งˆ์ง€๋ง‰์œผ๋กœ ์ฝ์—ˆ๋˜ ์œ„์น˜์—์„œ๋ถ€ํ„ฐ ๋‹ค์‹œ ์ฝ๊ธฐ ๊ฐ€๋Šฅ

์‰ฝ๊ฒŒ ์ •๋ฆฌํ•˜๊ธฐ

์ด๋ฒคํŠธ(Event) = ์šฐํŽธ๋ฌผ(ํŽธ์ง€ยท์†Œํฌ)

  • Kafka์—์„œ๋Š” โ€œ์ด๋ฒคํŠธโ€๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ด์€ ์ตœ์†Œ ๋‹จ์œ„
  • โ€œ์šฐํŽธ๋ฌผ(ํŽธ์ง€ยท์†Œํฌ)โ€์— ํ•ด๋‹น
  • ๋ณด๋‚ผ ์‚ฌ๋žŒ(Producer)์ด ๋‚ด์šฉ์„ ๋‹ด์•„ ๋ฐœ์†กํ•˜๊ณ , ๋ฐ›์„ ์‚ฌ๋žŒ(Consumer)์ด ์—ด์–ด๋ณด๋Š” ๋‹จ์œ„

ํ† ํ”ฝ(Topic) = ์šฐ์ฒดํ†ต

  • Kafka์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ถ„๋ฅ˜ยท์ €์žฅํ•˜๋Š” ๋…ผ๋ฆฌ์ ์ธ ๊ณต๊ฐ„
  • โ€œ์šฐ์ฒดํ†ตโ€์— ํ•ด๋‹น
  • ํ”„๋กœ๋“€์„œ๊ฐ€ ์ด๋ฒคํŠธ๋ฅผ ๋„ฃ๊ณ , ์ปจ์Šˆ๋จธ๊ฐ€ ๊ตฌ๋…ํ•ด ๊บผ๋‚ด ์ฝ์Œ
  • ์˜ˆ๋ฅผ ๋“ค์–ด, chat-messages๋Š” ์ฑ„ํŒ… ์ „์šฉ ์šฐ์ฒดํ†ต, order-events๋Š” ์ฃผ๋ฌธ ์ „์šฉ ์šฐ์ฒดํ†ต์ด๋ผ๊ณ  ์ƒ๊ฐ

ํŒŒํ‹ฐ์…˜(Partition) = ์šฐ์ฒดํ†ต ์† ๋ฒˆํ˜ธ ๋ถ™์€ ์นธ

  • ํ•˜๋‚˜์˜ ํ† ํ”ฝ์„ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์ž‘์€ ๋‹จ์œ„๋กœ ๋‚˜๋ˆˆ ๊ฒƒ
  • โ€œ์šฐ์ฒดํ†ต ์† ๋ฒˆํ˜ธ ๋ถ™์€ ์นธโ€์— ํ•ด๋‹น
  • ํ•˜๋‚˜์˜ ์šฐ์ฒดํ†ต(Topic)์€ ์—ฌ๋Ÿฌ ์นธ(Partition)์œผ๋กœ ๋‚˜๋‰˜์–ด ์žˆ๊ณ ,
  • ๊ฐ™์€ roomId(ํ‚ค)๋ฅผ ๊ฐ€์ง„ ์ด๋ฒคํŠธ๋Š” ํ•ญ์ƒ ๊ฐ™์€ ์นธ์— ๋“ค์–ด๊ฐ€ ์ˆœ์„œ ๋ณด์žฅ
  • ์นธ๋งˆ๋‹ค ๋ฒˆํ˜ธ(Partition 0,1,2โ€ฆ)๊ฐ€ ๋ถ™์–ด ์žˆ๊ณ , ๊ฐ™์€ ์šฐํŽธ๋ฒˆํ˜ธ(roomId) ๋กœ ๋ณด๋‚ธ ํŽธ์ง€๋Š” ์–ธ์ œ๋‚˜ ๊ฐ™์€ ์นธ์— ๋“ค์–ด๊ฐ€๋„๋ก ์„ค๊ณ„
  • ๊ทธ๋ž˜์„œ ๊ทธ ์นธ ์•ˆ์—์„œ๋Š” ๋“ค์–ด์˜จ ์ˆœ์„œ๋Œ€๋กœ ํŽธ์ง€๊ฐ€ ๋ณด๊ด€๋˜์–ด ์ˆœ์„œ๊ฐ€ ๋ณด์žฅ

๋ธŒ๋กœ์ปค(Broker) = ์šฐ์ฒด๊ตญ ์ง€์ 

  • Kafka ์„œ๋ฒ„ ํ•˜๋‚˜๋ฅผ ๊ฐ€๋ฆฌํ‚ค๋Š” ์šฉ์–ด
  • โ€œ์šฐ์ฒด๊ตญ ์ง€์ โ€ ๊ณผ ๊ฐ™์Œ
  • ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด์—์„œ ํŒŒํ‹ฐ์…˜ ๋ฆฌ๋” ์—ญํ• ์„ ์ˆ˜ํ–‰

ํด๋Ÿฌ์Šคํ„ฐ(Cluster) = ์šฐ์ฒด๊ตญ ์ง€์ ๋“ค์ด ๋ชจ์ธ ์ „๊ตญ๋ง

  • ์—ฌ๋Ÿฌ ๋ธŒ๋กœ์ปค๊ฐ€ ๋ชจ์—ฌ ๊ตฌ์„ฑ๋œ ์ง‘ํ•ฉ
  • โ€œ์ „๊ตญ ์šฐ์ฒด๊ตญ ๋„คํŠธ์›Œํฌโ€์— ํ•ด๋‹น
  • ํ•œ ์ง€์ (๋ธŒ๋กœ์ปค) ์žฅ์•  ์‹œ์—๋„ ๋‹ค๋ฅธ ์ง€์ ์ด ๋Œ€์‹  ์ฒ˜๋ฆฌ

ํ”„๋กœ๋“€์„œ(Producer) = ํŽธ์ง€ ๋ถ€์น˜๋Š” ์‚ฌ๋žŒ

  • Kafka์— ์ด๋ฒคํŠธ๋ฅผ ๋ฐœํ–‰(Publish)ํ•˜๋Š” ํด๋ผ์ด์–ธํŠธ
  • โ€œํŽธ์ง€ ๋ถ€์น˜๋Š” ์‚ฌ๋žŒโ€ ์—ญํ• 
  • ํ† ํ”ฝ๊ณผ ํ‚ค(roomId)๋ฅผ ์ง€์ •ํ•ด ๋ฉ”์‹œ์ง€ ์ „์†ก

์ปจ์Šˆ๋จธ(Consumer) = ์šฐ์ฒดํ†ต์—์„œ ํŽธ์ง€ ๊บผ๋‚ด ์ฝ๋Š” ์‚ฌ๋žŒ

  • ํŠน์ • ํ† ํ”ฝ์„ ๊ตฌ๋…(Subscribe)ํ•˜๊ณ  ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ํด๋ผ์ด์–ธํŠธ
  • โ€œ์šฐ์ฒดํ†ต์—์„œ ํŽธ์ง€ ๊บผ๋‚ด ์ฝ๋Š” ์‚ฌ๋žŒโ€ ์—ญํ• 
  • ๊ฐ™์€ ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน ๋‚ด์—์„œ ํŒŒํ‹ฐ์…˜์„ ๋‚˜๋ˆ  ์ฝ์–ด ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ

์นดํ”„์นด Pub/Sub ๊ตฌ์กฐ

์œ„์—์„œ ์ž‘์„ฑํ•œ Pub/Sub ๊ตฌ์กฐ๋ž‘ ๋งค์šฐ ์œ ์‚ฌํ•˜๊ฒŒ ์ž‘๋™ํ•œ๋‹ค. Producer๋Š” Topic์— ์ด๋ฒคํŠธ๋ฅผ ๋ณด๋‚ด๊ณ , ์ด ์ด๋ฒคํŠธ๋Š” Topic์˜ ๊ฐ Partition์— ๋ถ„์‚ฐ๋˜์–ด ์ €์žฅ๋œ๋‹ค. ํŠน์ • Topic์„ ๊ตฌ๋…ํ•˜๊ณ  ์žˆ๋Š” Consumer Group๋‚ด์˜ Consumer๋Š” ๊ฐ๊ฐ 1๊ฐœ ์ด์ƒ์˜ Partition ์œผ๋กœ ๋ถ€ํ„ฐ ์ด๋ฒคํŠธ๋ฅผ ๊ฐ€์ ธ์˜จ๋‹ค. ๋งŒ์•ฝ Partition ๊ฐฏ์ˆ˜๋ณด๋‹ค Consumer๊ฐ€ ๋งŽ๋‹ค๋ฉด, ์•„๋ฌด๊ฒƒ๋„ ์•ˆํ•˜๋Š” Consumer๊ฐ€ ๋งŽ์•„์งˆ ๊ฐ€๋Šฅ์„ฑ์ด ์กด์žฌํ•˜๊ธฐ ๋•Œ๋ฌธ์— ํ•ญ์ƒ Partition ์ˆ˜๋ฅผ Consumer๋ณด๋‹ค ๊ฐ™๊ฑฐ๋‚˜ ํฌ๊ฒŒ ํ•ด์ฃผ๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.

Topic์˜ Partition

์™œ Topic์€ ์—ฌ๋Ÿฌ๊ฐœ์˜ Partition ์œผ๋กœ ๋‚˜๋ˆŒ๊นŒ?
์นดํ”„์นด์˜ ํ† ํ”ฝ์— ๋ฉ”์„ธ์ง€๊ฐ€ ์“ฐ์—ฌ์ง€๋Š” ๊ฒƒ๋„ ์–ด๋А์ •๋„ ์‹œ๊ฐ„์ด ์†Œ๋น„๋˜๋Š”๋ฐ ๋งŒ์•ฝ ๋ช‡ ์ฒœ๊ฑด์˜ ๋ฉ”์„ธ์ง€๊ฐ€ ๋™์‹œ์— ์นดํ”„์นด์— write ๋˜๋ฉด ๋ณ‘๋ชฉํ˜„์ƒ์ด ๋ฐœ์ƒํ•  ๊ฒƒ์ด๋‹ค. ๋”ฐ๋ผ์„œ ์—ฌ๋Ÿฌ๊ฐœ์˜ Partition์œผ๋กœ ๋‚˜๋ˆ„์–ด ์ฒ˜๋ฆฌํ•˜๋ฉด ์žฅ์ ์ด ๋งŽ๋‹ค.

๋‚˜ ๊ฐ™์€ ๊ฒฝ์šฐ ๋‹จ์ผ ํ† ํ”ฝ์— 5๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์„ ๋‘์—ˆ๋‹ค.

roomIdhash(roomId) % 5ํŒŒํ‹ฐ์…˜์„ค๋ช…
10111101 ํ•ด์‹œ ๊ฒฐ๊ณผ๊ฐ€ 1 โ†’ ํŒŒํ‹ฐ์…˜ 1์— ์ €์žฅ
20222202 ํ•ด์‹œ ๊ฒฐ๊ณผ๊ฐ€ 2 โ†’ ํŒŒํ‹ฐ์…˜ 2์— ์ €์žฅ
30333303 ํ•ด์‹œ ๊ฒฐ๊ณผ๊ฐ€ 3 โ†’ ํŒŒํ‹ฐ์…˜ 3์— ์ €์žฅ
40444404 ํ•ด์‹œ ๊ฒฐ๊ณผ๊ฐ€ 4 โ†’ ํŒŒํ‹ฐ์…˜ 4์— ์ €์žฅ
50500505 ํ•ด์‹œ ๊ฒฐ๊ณผ๊ฐ€ 0 โ†’ ํŒŒํ‹ฐ์…˜ 0์— ์ €์žฅ
60611606 ํ•ด์‹œ ๊ฒฐ๊ณผ๊ฐ€ 1 โ†’ ํŒŒํ‹ฐ์…˜ 1์— ์ €์žฅ
  • ์ฒ˜๋ฆฌ๋Ÿ‰ ํ™•์žฅ(Scalability): ํ•˜๋‚˜์˜ ํŒŒํ‹ฐ์…˜๋งŒ ์“ฐ๋ฉด ์ฒ˜๋ฆฌ ์šฉ๋Ÿ‰์ด ์ œํ•œ๋˜์ง€๋งŒ, 5๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ„๋ฉด ๋™์‹œ์— 5๊ฐœ ์“ฐ๊ธฐยท์ฝ๊ธฐ ์ž‘์—…์„ ๋ณ‘๋ ฌ๋กœ ํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ๋ณ‘๋ ฌ ์†Œ๋น„(Parallelism): ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์—์„œ ๊ฐ ํŒŒํ‹ฐ์…˜์„ ๋‹ค๋ฅธ ์ปจ์Šˆ๋จธ๊ฐ€ ๋งก์•„ ์ฝ๊ธฐ ๋•Œ๋ฌธ์— ์ฒ˜๋ฆฌ ์†๋„๊ฐ€ ๋นจ๋ผ์ง„๋‹ค.
  • ์ˆœ์„œ ๋ณด์žฅ(Ordering): ๊ฐ™์€ roomId ๊ฐ™์€ ํŒŒํ‹ฐ์…˜(์˜ˆ: ํ•ด์‹œ ๊ฒฐ๊ณผ๋กœ ํŒŒํ‹ฐ์…˜ 1)์— ๋ชจ์•„ ์ˆœ์ฐจ์ ์œผ๋กœ ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.
  • ์žฅ์•  ๊ฒฉ๋ฆฌ(Fault Isolation): ํŠน์ • ํŒŒํ‹ฐ์…˜์ด ์žˆ๋Š” ๋ธŒ๋กœ์ปค๊ฐ€ ์žฅ์•  ๋‚˜๋„, ๋‚˜๋จธ์ง€ ํŒŒํ‹ฐ์…˜์€ ๊ณ„์† ๋™์ž‘ํ•ด ์ „์ฒด ์„œ๋น„์Šค ๊ฐ€์šฉ์„ฑ์„ ๋†’์ผ ์ˆ˜ ์žˆ๋‹ค.

Kafka ํ”„๋กœ๋“€์„œ๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ํ‚ค(key)๊ฐ€ ์—†๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ ๋•Œ, ์„ค์ •๋œ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋ผ์šด๋“œ ๋กœ๋นˆ(Roundโ€‘Robin) ๋ฐฉ์‹์œผ๋กœ ์ˆœ์ฐจ ํ• ๋‹นํ•œ๋‹ค. ๋”ฐ๋ผ์„œ ํ•˜๋‚˜์˜ ํŒŒํ‹ฐ์…˜ ๋‚ด์—์„œ๋Š” ๋ฉ”์„ธ์ง€ ์ˆœ์„œ๊ฐ€ ๋ณด์žฅ๋˜์ง€๋งŒ, ํŒŒํ‹ฐ์…˜์ด ์—ฌ๋Ÿฌ๊ฐœ์ผ ๊ฒฝ์šฐ์—๋Š” ์ˆœ์„œ๊ฐ€ ๋ณด์žฅ์•Š๋Š”๋‹ค.

๋‚˜๋Š” ํ‚ค(key) ๊ธฐ๋ฐ˜ ํŒŒํ‹ฐ์…”๋‹์„ ์‚ฌ์šฉํ•˜๋ฉด, ํ”„๋กœ๋“€์„œ๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ ๋•Œ roomId๋ฅผ ํ‚ค๋กœ ์ง€์ •ํ•œ๋‹ค.
์ฆ‰, โ€œํ‚ค๋ฅผ roomId๋กœ ์„ค์ •ํ–ˆ๋‹คโ€๋Š” ๊ฑด, ๋™์ผํ•œ ๋ฐฉ์˜ ๋ฉ”์‹œ์ง€๋ผ๋ฆฌ ์ „์šฉ ํŒŒํ‹ฐ์…˜์— ๋ชจ์•„ ์ˆœ์„œ๋ฅผ ๋ณด์žฅํ•˜๋ผ๋Š” ๋œป์ด๋‹ค.

ํ•˜์ง€๋งŒ ๋‹ค๋งŒ, ํ•œ๋ฒˆ ๋Š˜๋ฆฐ ํŒŒํ‹ฐ์…˜์€ ์ ˆ๋Œ€ ์ค„์ผ ์ˆ˜ ์—†๊ธฐ ๋•Œ๋ฌธ์— ์ฒ˜์Œ์— ๋งŽ์€ ๊ณ ๋ฏผ์„ ํ•˜๊ณ  ์„ค๊ณ„๋ฅผ ํ•ด์•ผํ•œ๋‹ค.

์ปจ์Šˆ๋จธ ๊ทธ๋ฃน(Consumer Group)

  • ์นดํ”„์นด์˜ ๊ฐœ๋ณ„ Consumer ์ธ์Šคํ„ด์Šค๋ฅผ ํ•˜๋‚˜๋กœ ๋ฌถ๋Š” ๋…ผ๋ฆฌ์  ๊ทธ๋ฃน ๋‹จ์œ„
  • ๋งŒ์•ฝ Consumer1 ์— ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•˜๋”๋ผ๋„ ๋™์ผ Consumer Group์— ์žˆ๋Š” Consumer ๋“ค์ด ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•จ
Partitionํ• ๋‹น๋œ Consumer
0Consumer 1
1Consumer 2
2Consumer 3
3Consumer 4
4Consumer 5

  • ๊ฐ ํŒŒํ‹ฐ์…˜์€ Consumer Group ๋‚ด ํ•˜๋‚˜์˜ Consumer์—๊ฒŒ๋งŒ ํ• ๋‹น๋จ โ†’ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅ. ์ฒ˜๋ฆฌ ์†๋„ ํ–ฅ์ƒ

  • ๊ฐ™์€ Consumer Group์— ์†ํ•œ ์ปจ์Šˆ๋จธ๋“ค์€ ์ค‘๋ณต ์—†์ด ๋ฐ์ดํ„ฐ ์†Œ๋น„

  • ๋งŒ์•ฝ Consumer ์ˆ˜ > Partition ์ˆ˜๋ฉด, ์ผ๋ถ€ Consumer๋Š” ๋†€๊ฒŒ ๋จ

  • Consumer Group์ด ๋‹ค๋ฅด๋ฉด ๋ชจ๋“  ๊ทธ๋ฃน์ด ๋…๋ฆฝ์ ์œผ๋กœ ๋™์ผ ๋ฐ์ดํ„ฐ ์†Œ๋น„ ๊ฐ€๋Šฅ

  • ํ•œ ๊ฐœ์˜ Consumer๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํ† ํ”ฝ์„ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ

STOMP + Kafka

์ด๋ฒˆ ํ”„๋กœ์ ํŠธ์—์„œ ๋‚˜๋Š” ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ฑ„ํŒ… ํ•  ์ˆ˜์žˆ๋Š” ์‹œ์Šคํ…œ์„ ๋งŒ๋“ค์–ด๋ณผ๋ ค๊ณ  ํ–ˆ๋‹ค. ๋”ฐ๋ผ์„œ ์•„์ง ๊ฐœ๋…์€ ๋ถ€์กฑํ•˜์ง€๋งŒ ๊ทธ๋ž˜๋„ ์„ฑ๊ณต(?)์€ ํ–ˆ์œผ๋‹ˆ ๊ธฐ๋กํ•ด๋ณผ๋ ค๊ณ  ํ•œ๋‹ค. ์•„๋งˆ ์ž˜๋ชป๋œ ๋ถ€๋ถ„๋„ ๋งŽ์•„์„œ ์ฐจํ›„์— ์ˆ˜์ •์„ ํ•ด์•ผํ•  ๊ฒƒ ๊ฐ™๋‹ค.

WebSocket/STOMP

@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

  private final StompHandler stompHandler;

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.setApplicationDestinationPrefixes("/send");
  }

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry
        .addEndpoint("/ws-stomp")
        .setAllowedOriginPatterns("http://localhost:3000")
        .withSockJS();
  }

  @Override
  public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.interceptors(stompHandler);
  }
}

์Šคํ”„๋ง ๋‚ด์žฅ ๋ฉ”๋ชจ๋ฆฌ ๋ธŒ๋กœ์ปค์ธ simpleBroker("/topic") ์„ค์ •์€ ์ œ๊ฑฐํ•˜์˜€๋‹ค. ์ด์ œ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๋กœ์„œ ์Šคํ”„๋ง์˜ ๊ธฐ๋ณธ Simple Broker ๋Œ€์‹  Kafka๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. Kafka๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰(Publish)ํ•˜๊ณ  ๊ตฌ๋…(Subscribe)ํ•˜๋Š” ๊ตฌ์กฐ๋กœ ์ „ํ™˜ํ•จ์— ๋”ฐ๋ผ, ๋” ์ด์ƒ enableSimpleBroker() ์„ค์ •์ด ํ•„์š”ํ•˜์ง€ ์•Š๋‹ค.

  • ํด๋ผ์ด์–ธํŠธ๊ฐ€ SockJS(STOMP)๋กœ /ws-stomp ์—”๋“œํฌ์ธํŠธ์— ์—ฐ๊ฒฐํ•˜๋„๋ก ์„ค์ •
  • ํด๋ผ์ด์–ธํŠธ๊ฐ€ ์„œ๋ฒ„๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ ๋•Œ URL ์•ž์— /send๋ฅผ ๋ถ™์ด๋„๋ก ์ง€์ •

KafkaProducerConfig

@Configuration
public class KafkaProducerConfig {

  @Value("${spring.kafka.bootstrap-servers}")
  private String bootstrapServers;

  public Map<String, Object> producerConfig() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
  }

  @Bean
  public ProducerFactory<String, ChatRequest> chatRequestProducerFactory() {
    return new DefaultKafkaProducerFactory<>(
        producerConfig(),
        new StringSerializer(),
        new JsonSerializer<ChatRequest>()
    );
  }

  @Bean
  public KafkaTemplate<String, ChatRequest> chatRequestKafkaTemplate(
      ProducerFactory<String, ChatRequest> chatRequestProducerFactory) {
    return new KafkaTemplate<>(chatRequestProducerFactory);
  }
}
  • bootstrapServers(application.yml์˜ spring.kafka.bootstrap-servers)๋ฅผ ํ†ตํ•ด Kafka ํด๋Ÿฌ์Šคํ„ฐ ์ฃผ์†Œ๋ฅผ ์ง€์ •
  • ๋ฉ”์‹œ์ง€ ํ‚ค๋Š” ๋ฌธ์ž์—ด(StringSerializer), ๊ฐ’์€ JSON ํ˜•ํƒœ(JsonSerializer)๋กœ ์ง๋ ฌํ™”
  • ChatRequest ๊ฐ์ฒด๋ฅผ value๋กœ ๋ณด๋‚ด๊ธฐ ์œ„ํ•œ ProducerFactory์™€ KafkaTemplate ์ƒ์„ฑ
  • ๋‚ด๋ถ€์ ์œผ๋กœ JsonSerializer๊ฐ€ DTO๋ฅผ JSON์œผ๋กœ ๋ณ€ํ™˜ํ•˜๋ฉฐ, StringSerializer๊ฐ€ ํ‚ค(roomId)๋ฅผ ์ง๋ ฌํ™”

KafkaProducer

@Component
@RequiredArgsConstructor
public class KafkaProducer {

  private final KafkaTemplate<String, ChatRequest> chatRequestKafkaTemplate;

  public void send(String topic, String roomId, ChatRequest message) {
    chatRequestKafkaTemplate.send(topic, roomId, message);
  }
}
  • Kafka ๋ฉ”์‹œ์ง€๋ฅผ ํ•œ ๊ณณ์—์„œ ๋ณด๋‚ผ ์ˆ˜ ์žˆ๋„๋ก ์„ค๊ณ„๋œ ๋ž˜ํผ

ChatController

@Controller
@RequiredArgsConstructor
public class ChatController {

  private final KafkaProducer kafkaProducer;

  @MessageMapping("/chat/{roomId}")
  public void sendChat(
      ChatRequest incoming,
      @DestinationVariable("roomId") Long roomId,
      Principal principal
  ) {
    ChatMemberDetails userDetails = getUserDetails(
        (UsernamePasswordAuthenticationToken) principal);
    Long memberId = userDetails.getMemberId();
    String userName = userDetails.getNickName();

    ChatRequest enriched = createChatRequest(incoming, memberId, userName);

    kafkaProducer.send(
        "chat-messages",
        roomId.toString(),
        enriched
    );
  }

1. ํด๋ผ์ด์–ธํŠธ โ†’ WebSocket ์ „์†ก

  • ํ”„๋ก ํŠธ์—”๋“œ์—์„œ /send/chat/{roomId}๋กœ STOMP ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ธ๋‹ค.
  • ๋ฉ”์‹œ์ง€๋Š” WebSocket์„ ํ†ตํ•ด @MessageMapping("/chat/{roomId}")์— ๋„๋‹ฌํ•œ๋‹ค.
  • @DestinationVariable("roomId") Long roomId : ๊ฒฝ๋กœ์˜ {roomId} ๊ฐ’์„ ์ถ”์ถœํ•˜์—ฌ ์ฑ„ํŒ…๋ฐฉ ์‹๋ณ„์ž๋กœ ์‚ฌ์šฉ

2. Spring Controller ์ˆ˜์‹ 

  • Kafka๋กœ ๋น„๋™๊ธฐ ์ „์†ก
  • kafkaTemplate.send("chat-messages", roomId, enriched) ํ˜ธ์ถœ
  • โ€œchat-messagesโ€ ํ† ํ”ฝ์— ํ‚ค๋ฅผ ๋ฐฉ๋ฒˆํ˜ธ(roomId)๋กœ ์ง€์ • ๋ณด๋‚ธ๋‹ค.
  • ๋‹ค๋ฅธ ์„œ๋ฒ„๋‚˜ ์„œ๋น„์Šค(๋˜๋Š” ๊ฐ™์€ ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜ ๋‚ด Kafka ๋ฆฌ์Šค๋„ˆ)๊ฐ€ ์ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์•„ ์‹ค์ œ DB ์ €์žฅ, WebSocket ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ๋“ฑ์„ ์ฒ˜๋ฆฌํ•˜๊ฒŒ ๋œ๋‹ค.

KafkaConsumerConfig

@Configuration
public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

public Map<String, Object> consumerConfig(String groupId) {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  return props;
}

@Bean
public ConsumerFactory<String, ChatRequest> chatRequestConsumerFactory() {
  JsonDeserializer<ChatRequest> deserializer = new JsonDeserializer<>(ChatRequest.class);
  deserializer.addTrustedPackages("yuhan.pro.chatserver.domain.dto");
  String chatGroup = "${spring.application.name}-${random.uuid}";
  return new DefaultKafkaConsumerFactory<>(
      consumerConfig(chatGroup),
      new StringDeserializer(),
      deserializer
  );
}

@Bean(name = "chatMessageFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ChatRequest>> factory(
    ConsumerFactory<String, ChatRequest> chatRequestConsumerFactory) {
  ConcurrentKafkaListenerContainerFactory<String, ChatRequest> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(chatRequestConsumerFactory);
  factory.setConcurrency(5);
  return factory;
}
  • bootstrapServers: application.yml ๋“ฑ์— ์ •์˜๋œ Kafka ํด๋Ÿฌ์Šคํ„ฐ ์ฃผ์†Œ

  • ํ‚ค ์—ญ์ง๋ ฌํ™”: ๋ฉ”์‹œ์ง€ ํ‚ค๋ฅผ StringDeserializer๋กœ

  • ์˜คํ”„์…‹ ๋ฆฌ์…‹: ์ปจ์Šˆ๋จธ ์‹œ์ž‘ ์‹œ์— latest๋กœ ์„ค์ •ํ•ด, ๊ฐ€์žฅ ๋งˆ์ง€๋ง‰(์ตœ์‹ ) ๋ฉ”์‹œ์ง€๋ถ€ํ„ฐ ์ฝ๊ธฐ
    -> earliest๋Š”
    ์ปจ์Šˆ๋จธ๊ฐ€ ์ฒ˜์Œ ํ•ฉ๋ฅ˜ํ•˜๊ฑฐ๋‚˜ ์œ ํšจํ•œ ์˜คํ”„์…‹์ด ์—†์„ ๋•Œ, ํ† ํ”ฝ์˜ ๊ฐ€์žฅ ์ฒ˜์Œ๋ถ€ํ„ฐ(๊ฐ€์žฅ ์˜ค๋ž˜๋œ ๋ฉ”์‹œ์ง€) ์ฝ๊ธฐ ์‹œ์ž‘

    ์ƒํ™ฉ๊ถŒ์žฅ ์„ค์ •
    ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋งŒ ์ค‘์š”ํ•˜๊ณ  ๊ณผ๊ฑฐ ์ด๋ ฅ์€ ํ•„์š” ์—†์„ ๋•Œlatest
    ์„œ๋น„์Šค ์ดˆ๊ธฐํ™” ์‹œ ํ† ํ”ฝ ์ „์ฒด ์ด๋ ฅ์„ ๋กœ๋“œํ•˜๋ฉด์„œ ์ƒํƒœ๋ฅผ ๊ตฌ์ถ•ํ•  ๋•Œearliest
    ์ด๋ฏธ ์ฒ˜๋ฆฌ๋œ ๋ฉ”์‹œ์ง€๊ฐ€ ๋‹ค์‹œ ์žฌ์ฒ˜๋ฆฌ๋˜๋Š” ๊ฒƒ์„ ํ”ผํ•˜๊ณ  ์‹ถ์„ ๋•Œlatest
    ์žฌ์‹œ์ž‘ ์‹œ์—๋„ ๊ณผ๊ฑฐ ์ฒ˜๋ฆฌํ•˜์ง€ ๋ชปํ•œ ๋ฉ”์‹œ์ง€๋ถ€ํ„ฐ ์ด์–ด์„œ ์ฒ˜๋ฆฌํ•˜๊ณ  ์‹ถ์„ ๋•Œearliest
  • ๊ทธ๋ฃน ์•„์ด๋””: ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›์€ groupId๋ฅผ ์‚ฌ์šฉํ•ด, ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์„ ๋™์  ์ƒ์„ฑ
  • JsonDeserializer: JSON์œผ๋กœ ์ง๋ ฌํ™”๋œ ChatRequest ๋ฉ”์‹œ์ง€๋ฅผ DTO๋กœ ์—ญ์ง๋ ฌํ™”
  • addTrustedPackages: ์—ญ์ง๋ ฌํ™” ๊ฐ€๋Šฅํ•œ ํŒจํ‚ค์ง€ ๋ฒ”์œ„๋ฅผ ์ง€์ •ํ•ด ๋ณด์•ˆ ๊ฐ•ํ™”
  • ๋™์  ๊ทธ๋ฃน ID: ${spring.application.name}์— ๋žœ๋ค UUID๋ฅผ ๋ถ™์—ฌ ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์„ ๊ณ ์œ ํ•˜๊ฒŒ ์ƒ์„ฑ
    -> ๊ฐ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ธ์Šคํ„ด์Šค๊ฐ€ ํ† ํ”ฝ์˜ ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ ๋น ์ง์—†์ด ๋ฐ›์•„์•ผํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์„œ๋ฒ„๋งˆ๋‹ค ๊ณ ์œ ํ•˜๊ฒŒ ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์„ ์ƒ์„ฑํ•˜๊ฒŒ ํ–ˆ๋‹ค.
  • DefaultKafkaConsumerFactory: ์œ„ ํ”„๋กœํผํ‹ฐ์™€ ํ‚ค/๊ฐ’ ์—ญ์ง๋ ฌํ™”๊ธฐ๋ฅผ ์กฐํ•ฉํ•ด ํŒฉํ† ๋ฆฌ ๊ฐ์ฒด ์ƒ์„ฑ

KafkaListeners

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaListeners {

private final SimpMessageSendingOperations messagingTemplate;
private final ChatService chatService;

@KafkaListener(
    topics = "chat-messages",
    groupId = "${spring.application.name}-${random.uuid}",
    containerFactory = "chatMessageFactory"
)
public void handleChatMessage(
    ChatRequest message,
    @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
    @Header(KafkaHeaders.RECEIVED_KEY) String roomId
) {
  try {
    log.info("Kafka ์ˆ˜์‹  - roomId: {}, partition: {}, message: {}", roomId, partition, message);
    messagingTemplate.convertAndSend("/topic/rooms/" + roomId, message);
    chatService.saveChat(message, Long.valueOf(roomId));
  } catch (Exception e) {
    log.error("Kafka ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์˜ค๋ฅ˜", e);
  }
}
  • topics = "chat-messages"
    โ†’ ์ด ๋ฆฌ์Šค๋„ˆ ๋ฉ”์„œ๋“œ๋Š” chat-messages ํ† ํ”ฝ์— ๋ฐœํ–‰๋œ ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ ๊ตฌ๋…

  • groupId = "${spring.application.name}-${random.uuid}"
    โ†’ ์ธ์Šคํ„ด์Šค๋ณ„(ํ˜น์€ ์„ธ์…˜๋ณ„)๋กœ ๊ณ ์œ ํ•œ ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์„ ๋งŒ๋“ค์–ด, ๊ฐ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ธ์Šคํ„ด์Šค๊ฐ€ ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ ๋…๋ฆฝ์ ์œผ๋กœ ๋ฐ›์•„ ์ฒ˜๋ฆฌํ•˜๋„๋ก ๋ณด์žฅ

  • containerFactory = "chatMessageFactory"
    โ†’ KafkaConsumerConfig์—์„œ ์„ค์ •ํ•œ ConcurrentKafkaListenerContainerFactory<String, ChatRequest> ๋นˆ์„ ์‚ฌ์šฉํ•ด, JSON โ†’ ChatRequest ์—ญ์ง๋ ฌํ™”์™€ ๋ณ‘๋ ฌ์„ฑ ์„ค์ •(Concurrency = 5)์„ ์ ์šฉ

  • @Header(KafkaHeaders.RECEIVED_KEY) String roomId

    kafkaProducer.send("chat-messages", roomId.toString(), enriched);

    -> ์ด ๊ฐ’์€ ํ”„๋กœ๋“€์„œ(์—ฌ๊ธฐ์„œ๋Š” kafkaProducer.send(โ€ฆ, roomId, enriched))์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ ๋•Œ ์ง€์ •ํ•œ ํ‚ค(key) ์™€ ๋™์ผ

  • @Header(KafkaHeaders.RECEIVED_PARTITION) int partition
    -> ํ‚ค ๊ธฐ๋ฐ˜ ํŒŒํ‹ฐ์…”๋„ˆ๋ฅผ ์“ฐ๊ณ  ์žˆ๋‹ค๋ฉด(๊ธฐ๋ณธ ํŒŒํ‹ฐ์…”๋„ˆ๋Š” ํ‚ค์˜ ํ•ด์‹œ๊ฐ’์„ ์‚ฌ์šฉ) ๊ฐ™์€ ํ‚ค(roomId)์— ๋Œ€ํ•ด ํ•ญ์ƒ ๋™์ผํ•œ ํŒŒํ‹ฐ์…˜์„ ์„ ํƒ

  • WebSocket ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ
    messagingTemplate.convertAndSend("/topic/rooms/" + roomId, message)
    โ†’ STOMP ๊ตฌ๋… ๊ฒฝ๋กœ /topic/rooms/{roomId}๋กœ ๋“ค์–ด์™€ ์žˆ๋Š” ๋ชจ๋“  WebSocket ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ChatRequest๋ฅผ ์ „์†ก

    ํ˜„์žฌ ๋‚˜๋Š” ๋‹จ์ผ ๋ธŒ๋กœ์ปค(๋ธŒ๋กœ์ปค 1๊ฐœ)๋ฅผ ์‚ฌ์šฉ ์ค‘์ด์ง€๋งŒ ๋งŒ์•ฝ ๋‹จ์ผ ๋ธŒ๋กœ์ปค๊ฐ€ ๋‹ค์šด๋˜๋ฉด ํด๋Ÿฌ์Šคํ„ฐ ์ „์ฒด๊ฐ€ ๋งˆ๋น„๋œ๋‹ค. ๋”ฐ๋ผ์„œ ๋ณดํ†ต ๋ธŒ๋กœ์ปค ์ˆ˜๋ฅผ ๋Š˜๋ ค 3๋Œ€ ์ •๋„๋กœ ์šด์˜ํ•˜๋Š” ๊ฒƒ์ด ์ผ๋ฐ˜์ ์ด๋‹ค. ํ•˜์ง€๋งŒ ์ด ํ”„๋กœ์ ํŠธ๋Š” ์‚ฌ์ด๋“œ ํ”„๋กœ์ ํŠธ๋กœ ๊ฐœ๋ฐœ ๋‹จ๊ณ„์— ์žˆ์–ด, ํŽธ์˜์ƒ Kafka ๋ธŒ๋กœ์ปค๋ฅผ ๋‹จ์ผ ์ธ์Šคํ„ด์Šค(๋ณต์ œ ํŒฉํ„ฐ=1)๋กœ๋งŒ ์šด์˜ํ•˜๊ณ  ์žˆ๋‹ค.

์ฐธ๊ณ :

profile
๋…ธ๋ ฅ์€ ๋ฐฐ์‹ ํ•˜์ง€ ์•Š์•„ ๐Ÿ”ฅ

0๊ฐœ์˜ ๋Œ“๊ธ€