
์ฌ์ด๋ ํ๋ก์ ํธ๋ก STOMP ๊ธฐ๋ฐ ์ฑํ
์์คํ
์ ๊ฐ๋ฐํ๋ค.
๋จผ์ ๋ฉ์ธ ์๋ฒ์ ์ฑํ
์๋ฒ๋ฅผ ๋ถ๋ฆฌํ์ฌ ๊ตฌ์ถํ๋๋ฐ, ์ค์ ์ด์ ํ๊ฒฝ์์ ์ฑํ
์๋ฒ๋ฅผ ์ค์ผ์ผ ์์(Scale Out)ํด์ผ ํ ๊ฒฝ์ฐ์๋ ๋ฉ์์ง๊ฐ ์ค์๊ฐ์ผ๋ก ์ ๋ฌ๋๋์ง ๊ถ๊ธํ๋ค.
ํ
์คํธํด ๋ณธ ๊ฒฐ๊ณผ, ๋ถ์ฐ๋ ์ฑํ
์๋ฒ ๊ฐ์๋ ๋ฉ์์ง ๋๊ธฐํ๊ฐ ์ด๋ฃจ์ด์ง์ง ์์ ๋ค๋ฅธ ์ฌ์ฉ์์๊ฒ ์ฑํ
์ด ํ์๋์ง ์๋ ๋ฌธ์ ๊ฐ ๋ฐ์ํ๋ค.

๊ทธ ์ด์ ๋ ์ธ๋ฉ๋ชจ๋ฆฌ(Simple) ๋ธ๋ก์ปค ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ด๋ค.
Spring WebSocket์ ๊ธฐ๋ณธ STOMP ๋ธ๋ก์ปค(enableSimpleBroker)๋ ์๋ฒ ์ธ์คํด์ค ๋ด ๋ฉ๋ชจ๋ฆฌ ์์์๋ง ๋์ํ๋ค. ๋ฐ๋ผ์ ํ ๋
ธ๋์์ ๋ฐ์ ๋ฉ์์ง๋ ๊ทธ ๋
ธ๋์ ๋ฉ๋ชจ๋ฆฌ ๊ตฌ๋
์์๊ฒ๋ง ์ ๋ฌ๋๊ณ ๋ค๋ฅธ ๋
ธ๋์ ์ฐ๊ฒฐ๋ ํด๋ผ์ด์ธํธ๋ ์ ํ ๋ชจ๋ฅด๊ฒ ๋๋ค.
๋ฐ๋ผ์ ์ธ๋ถ ๋ธ๋ก์ปค๋ฅผ ์ฌ์ฉํด์ผํ๋๋ฐ ๋๋ Kafka๋ฅผ ๋์
ํด๋ณด๊ธฐ๋ก ๋ง์ ๋จน์๋ค.
Kafka, RabbitMQ, Redis Pub/Sub ์ค์ ์ง์ง ์ค๋์๊ฐ ๊ณ ๋ฏผ์ ํ์๋ค. ์ฒ์์๋ RabbitMQ๋ฅผ ์ฌ์ฉํ๋ ค๊ณ ํ์ง๋ง ์๊ฐ์ด ์ง๋ ์๋ก Kafka๋ฅผ ์ฌ์ฉํ๊ธฐ๋ก ๋ง์ ๋จน์๋ค. ์ฒ์์ RabbitMQ๋ฅผ ์ฌ์ฉํ๋ ค๊ณ ํ ๋ชฉ์ ์ ๋ฎ์ ์ฒ๋ฆฌ๋๊ณผ ์งง์ ์ง์ฐ์๊ฐ์ด๋ผ๋ ์ฅ์ ์ ๊ฐ์ง๊ณ ์์๊ธฐ ๋๋ฌธ์ด๋ค.

Redis Pub/Sub๋ ๋ฉ๋ชจ๋ฆฌ ๊ธฐ๋ฐ์ผ๋ก ๋น ๋ฅธ ์ฑ๋ฅ์ ์ ๊ณตํ์ง๋ง, ๋ฉ์์ง ์ ์ค ๊ฐ๋ฅ์ฑ์ด ๋๋ฌด ์ปธ๊ธฐ ๋๋ฌธ์ ์ฒ์๋ถํฐ ํ๋ณด์์ ์ ์ธํ๋ค. ์ด๋ ํ์ ํ์๋ ์คํธ๋ฆฌ๋ฐ ๊ฐ์ ์ฉ๋์๋ ์ ํฉํ๊ฒ ์ง๋ง, Slack์ด๋ ์นด์นด์คํก ๊ฐ์ ์ฑํ ์์คํ ์์๋ ์ฌ์ฉ์๊ฐ ์ ์๋ผ๋ ์ฐ๊ฒฐ์ด ๋๊ฒผ์ ๋ ๋์น ๋ฉ์์ง๋ฅผ ๋ณต๊ตฌํ ์ ์๋ ์น๋ช ์ ์ธ ๋จ์ ์ด ์กด์ฌํ๋ค.
RabbitMQ๋ ์ค์ ์ด ๊ฐํธํ๊ณ AMQP ํ๋กํ ์ฝ ๋๋ถ์ ๋น ๋ฅด๊ฒ ์์ํ ์ ์์ผ๋ฉฐ, ๋์คํฌ ์ฐ๊ธฐ ์์ด ๋ฉ๋ชจ๋ฆฌ๋ง์ผ๋ก ๋ฉ์์ง๋ฅผ ์ฃผ๊ณ ๋ฐ์ ๋ ์ง์ฐ์๊ฐ์ด ๋งค์ฐ ์งง๋ค๋ ์ฅ์ ์ด ์์๋ค. ์ฌ์ด๋ ํ๋ก์ ํธ๋ผ ์ ์ ๊ฐ ๋ช๋ฐฑ ๋ช ์์ค์ผ ๊ฒ์ด๋ผ ์์ํด โ๊ณผ๋ํ ํธ๋ํฝ์ด๋ ๋ณต์กํ ํํฐ์ ๋ ์์ด๋ ์ถฉ๋ถํ ์ปค๋ฒํ ์ ์๊ฒ ๋คโ๊ณ ํ๋จํ์ง๋ง, ์ค์ ๋ก ๋์ฉ๋ ๋ฉ์์ง ์ฒ๋ฆฌ ์ ์ฒ๋ฆฌ๋์ ํ๊ณ๊ฐ ๋ช ํํ๋ค๋ ๊ฒ์ ์๊ฒ๋์๋ค. k6๋ก ๋ถํ๋ฅผ ๊ฑธ์ด๋ณด๋ฉด ์ฅ์ ๊ฐ ๋ฐ์ํ ๊ฒ์ด ๋ถ๋ช ํ๊ณ , ๋ฉ์์ง๊ฐ ๊ณผ๋ํ๊ฒ ์์ผ ๊ฒฝ์ฐ ๋ฉ๋ชจ๋ฆฌ ๋ถ์กฑ์ผ๋ก ์์คํ ์ ์ฒด ์์ ์ฑ์ด ์ํ๋ฐ๋ ๋ฌธ์ ๋ ์ฐ๋ ค๋์๋ค.
๋ฐ๋ผ์ ๋์ ์ฒ๋ฆฌ๋, ๋ฉ์์ง ์ ์ค ๋ฐฉ์ง, ์์ ์ ์ธ ์ค์ผ์ผ ์์์ ๋ชจ๋ ์ถฉ์กฑํ๋ Kafka๋ฅผ ๋์ ํ๊ฒ ๋์๋ค.
์นดํ์นด(Kafka)๋ ๋ถ์ฐ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ์ผ๋ก, ๋๋์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ณ ์ค์๊ฐ์ผ๋ก ์ ์กํ๋ ๋ฐ ์ฌ์ฉ๋๋ค. ๋ชจ๋ ๋ฐ์ดํฐ๋ ๋ก๊ทธ ํ์์ผ๋ก ํ์ผ ์์คํ ์ ๊ธฐ๋ก๋๋ค. ์ฌ๊ธฐ์ ๋งํ๋ ๋ก๊ทธ๋ ์ถ๊ฐ๋ง ๊ฐ๋ฅํ๋ฉฐ, ์๊ฐ์์ผ๋ก ์์ ํ ์ ๋ ฌ๋ ๋ฐ์ดํฐ์ ํ๋ฆ(๋ ์ฝ๋ ์ํ์ค)์ ์๋ฏธํ๋ค. ๋ก๊ทธ๋ฅผ ํ๊ณณ์ ๋ชจ์ ์ฒ๋ฆฌํ ์ ์๋๋ก ์ค์์ง์คํ๋์ด ์์ผ๋ฉฐ, ๋์ฉ๋ ๋ฐ์ดํฐ๋ฅผ ์์งํ๊ณ ์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ์ผ๋ก ์๋น๊ฐ ๊ฐ๋ฅํ๋ค.
์ฐ์ ๋ฉ์ธ์ง/์ด๋ฒคํธ ๋ธ๋ก์ปค์ ๋ฉ์ธ์ง ํ๋ฅผ ์๋๊ฒ ๋ ์ข๋ค๊ณ ํ๋จํด์ ์ ๋ฆฌ ํ ๋ ค๊ณ ํ๋ค.

๋ฉ์์ง ํ(MQ)๋ ๋ฉ์์ง ์งํฅ ๋ฏธ๋ค์จ์ด(MOM : Message Oriented Middleware)๋ฅผ ๊ตฌํํ ์์คํ
์ผ๋ก ํ๋ก๊ทธ๋จ(ํ๋ก์ธ์ค) ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๊ตํํ ๋ ์ฌ์ฉํ๋ ๊ธฐ์ ์ ์๋ฏธํ๋ค.
Producer: ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ ์ฃผ์ฒด
Consumer: ๋ฉ์์ง๋ฅผ ๋ฐ๋ ์ฃผ์ฒด
Queue: ๋ฉ์์ง๋ฅผ ์ฐจ๋ก๋ก ๋ณด๊ดํ๋ค๊ฐ ์ ๋ฌํ๋ ๋๊ธฐ์ค

๐ค ๋ฉ์ธ์ง ์งํฅ ๋ฏธ๋ค์จ์ด(MOM)??
- ์์ฉ ์ํํธ์จ์ด ๊ฐ์ ๋น๋๊ธฐ์ ๋ฐ์ดํฐ ํต์ ์ ์ํ ์ํํธ์จ์ด
- ์์ฐ์(producer)๋ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ณ , ์๋น์(consumer)๋ ํ์ํ ๋ ๊บผ๋ด ์ฒ๋ฆฌ
- ์ฆ,
๋น๋๊ธฐ์ (Asynchronous)๋ฐฉ์์ ์ด์ฉํด์ ํ๋ก์ธ์ค๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๋ ๊ธฐ๋ฅ์ ์ํ ์์คํ
์ ํ๋ฆฌ์ผ์ด์
๊ฐ์ โ๋ฉ์์งโ๋ฅผ ์์ ํ๊ฒ ์ ๋ฌํด ์ฃผ๋ ์ค๊ฐ์
๋ฉ์ธ์ง ์งํฅ ๋ฏธ๋ค์์ด(Message-Oriented Middleware)๋ฅผ ๊ตฌํํ๋๋ฐ ์ฌ์ฉ๋๋ ์์คํ
pub/sub๊ตฌ์กฐ๋ผ๊ณ ํ๋ฉฐ ๋ํ์ ์ผ๋ก๋ Redis, RabbitMQ๊ฐ ์๋ค. ์์คํ
์์ ๋ฐ์ํ โ์ด๋ฒคํธ(์ฌ๊ฑด)โ๋ฅผ ๋ฐํ(publish)ํ๊ณ ,
๊ด์ฌ ์๋ ์๋น์(Subscriber)์๊ฒ ์ค์๊ฐ์ผ๋ก ์ ํํด ์ฃผ๋ ์ค๊ฐ์
๋ฉ์์ง ๋ธ๋ก์ปค๋ ์ด๋ฒคํธ ๋ธ๋ก์ปค๊ฐ ๋ ์ ์์ง๋ง, ์ด๋ฒคํธ ๋ธ๋ก์ปค๋ ๋ฉ์์ง ๋ธ๋ก์ปค ์ญํ ์ ํ ์ ์๋ค.
| ํน์ฑ | ๋ฉ์์ง ๋ธ๋ก์ปค | ์ด๋ฒคํธ ๋ธ๋ก์ปค |
|---|---|---|
| ๋ฐ์ดํฐ ์ฒ๋ฆฌ | ๋ฉ์์ง ์ฒ๋ฆฌ ํ ์ญ์ | ์ด๋ฒคํธ ์๊ตฌ ์ ์ฅ ๊ฐ๋ฅ |
| ์ฌ์ฒ๋ฆฌ | ์ ํ์ | ์ด๋ฒคํธ ์ฌ์ ๊ฐ๋ฅ |
| ํ์ฅ์ฑ | ์๋์ ์ผ๋ก ์ ํ์ | ๋์ ํ์ฅ์ฑ |
| ์ฉ๋ | ๋น๋๊ธฐ ์์ ์ฒ๋ฆฌ | ์ค์๊ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ |
| ๊ฒฐํฉ๋ | ์๋์ ์ผ๋ก ๋์ | ๋ฎ์ ๊ฒฐํฉ๋ |


์ด๋ฒคํธ(Event) : Kafka ์ ํตํด Producer ์ Consumer ๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๋ ๋จ์, ๋ฉ์ธ์ง
์นดํ์นด ํด๋ฌ์คํฐ(Kafka Cluster): Kafka Broker ๋ค์ ๋ชจ์, Kafka ๋ ํ์ฅ์ฑ๊ณผ ๊ณ ๊ฐ์ฉ์ฑ์ ์ํ์ฌ Broker ๋ค์ด ํด๋ฌ์คํฐ๋ก ๊ตฌ์ฑ
-> ์ฌ๋ฌ๊ฐ์ Kafka Broker ๋ก ์ด์์ ํ๋ฉด ๋์์ ๋ ๋ง์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์์ผ๋ฉฐ, ํ์์ ๋ฐ๋ผ Broker ์ ์ถ๊ฐ ๊ฐ๋ฅ
-> ํด๋ฌ์คํฐ ๋ด ํ Broker์ ์ฅ์ ๊ฐ ๋ฐ์ํด๋ ๋ค๋ฅธ Broker๊ฐ ํํฐ์
๋ฆฌ๋๋ฅผ ์๋์ผ๋ก ์น๊ฒฉํด ๋ฌด์ค๋จ ์ด์์ด ๊ฐ๋ฅ
๋ธ๋ก์ปค(Broker) : ๊ฐ๊ฐ์ Kafka ์๋ฒ, ๋์ผ ๋
ธ๋์ ์ฌ๋ฌ ๋ธ๋ก์ปค ๊ฐ๋ฅ
-> ์คํ๋ Kafka ์๋ฒ๋ฅผ ๋งํ๋ฉฐ, Kafka ๊ทธ ์์ฒด
ํ๋ก๋์(Producer): Kafka์ ์ด๋ฒคํธ(๋ฉ์์ง)๋ฅผ ๋ฐํ(Publish)ํ๋ ์ญํ ์ ํ๋ ํด๋ผ์ด์ธํธ
์ปจ์๋จธ(Consumer): ํน์ ํ ํฝ(Topic)์ ๊ตฌ๋
(Subscribe)ํ๊ณ , ๊ทธ ์์ ๋ด๊ธด ์ด๋ฒคํธ๋ฅผ ๋ฐ์ ์ฒ๋ฆฌํ๋ ํด๋ผ์ด์ธํธ
ํ ํฝ(Topic) : Kafka์์ ๋ฉ์์ง๋ฅผ ๋ถ๋ฅํ๊ณ ์ ์ฅํ๋ ๋
ผ๋ฆฌ์ ์ธ ๊ณต๊ฐ, ํ์ผ์์คํ
์ ํด๋์ ์ ์ฌ
ํํฐ์
(Partition) : Topic์ ์ฌ๋ฌ Broker์ ๋ถ์ฐ๋์ด ์ ์ฅ๋๋ฉฐ, ์ด๋ ๊ฒ ๋ถ์ฐ๋ topic์ partition์ด๋ผ๊ณ ํจ
-> ํ ํฝ์ ์ฌ๋ฌ ๊ฐ๋ก ๋๋ ์์ ๋จ์
์คํ์
(offset) : ์ปจ์๋จธ์์ ๋ฉ์ธ์ง๋ฅผ ์ด๋๊น์ง ์ฝ์๋์ง ์ ์ฅํ๋ ๊ฐ
-> ์ปจ์๋จธ ๊ทธ๋ฃน์ ์ปจ์๋จธ๋ค์ ๊ฐ๊ฐ์ ํํฐ์
์ ์์ ์ด ๊ฐ์ ธ๊ฐ ๋ฉ์์ง์ ์์น ์ ๋ณด(offset) ์ ๊ธฐ๋ก
-> ์ปจ์๋จธ ์ฅ์ ๋ฐ์ ํ ๋ค์ ์ด์๋๋, ์ ์ ๋ง์ง๋ง์ผ๋ก ์ฝ์๋ ์์น์์๋ถํฐ ๋ค์ ์ฝ๊ธฐ ๊ฐ๋ฅ
chat-messages๋ ์ฑํ
์ ์ฉ ์ฐ์ฒดํต, order-events๋ ์ฃผ๋ฌธ ์ ์ฉ ์ฐ์ฒดํต์ด๋ผ๊ณ ์๊ฐ
์์์ ์์ฑํ Pub/Sub ๊ตฌ์กฐ๋ ๋งค์ฐ ์ ์ฌํ๊ฒ ์๋ํ๋ค. Producer๋ Topic์ ์ด๋ฒคํธ๋ฅผ ๋ณด๋ด๊ณ , ์ด ์ด๋ฒคํธ๋ Topic์ ๊ฐ Partition์ ๋ถ์ฐ๋์ด ์ ์ฅ๋๋ค. ํน์ Topic์ ๊ตฌ๋
ํ๊ณ ์๋ Consumer Group๋ด์ Consumer๋ ๊ฐ๊ฐ 1๊ฐ ์ด์์ Partition ์ผ๋ก ๋ถํฐ ์ด๋ฒคํธ๋ฅผ ๊ฐ์ ธ์จ๋ค. ๋ง์ฝ Partition ๊ฐฏ์๋ณด๋ค Consumer๊ฐ ๋ง๋ค๋ฉด, ์๋ฌด๊ฒ๋ ์ํ๋ Consumer๊ฐ ๋ง์์ง ๊ฐ๋ฅ์ฑ์ด ์กด์ฌํ๊ธฐ ๋๋ฌธ์ ํญ์ Partition ์๋ฅผ Consumer๋ณด๋ค ๊ฐ๊ฑฐ๋ ํฌ๊ฒ ํด์ฃผ๋ ๊ฒ์ด ์ข๋ค.

์ Topic์ ์ฌ๋ฌ๊ฐ์ Partition ์ผ๋ก ๋๋๊น?
์นดํ์นด์ ํ ํฝ์ ๋ฉ์ธ์ง๊ฐ ์ฐ์ฌ์ง๋ ๊ฒ๋ ์ด๋์ ๋ ์๊ฐ์ด ์๋น๋๋๋ฐ ๋ง์ฝ ๋ช ์ฒ๊ฑด์ ๋ฉ์ธ์ง๊ฐ ๋์์ ์นดํ์นด์ write ๋๋ฉด ๋ณ๋ชฉํ์์ด ๋ฐ์ํ ๊ฒ์ด๋ค. ๋ฐ๋ผ์ ์ฌ๋ฌ๊ฐ์ Partition์ผ๋ก ๋๋์ด ์ฒ๋ฆฌํ๋ฉด ์ฅ์ ์ด ๋ง๋ค.
๋ ๊ฐ์ ๊ฒฝ์ฐ ๋จ์ผ ํ ํฝ์ 5๊ฐ์ ํํฐ์ ์ ๋์๋ค.
| roomId | hash(roomId) % 5 | ํํฐ์ | ์ค๋ช |
|---|---|---|---|
| 101 | 1 | 1 | 101 ํด์ ๊ฒฐ๊ณผ๊ฐ 1 โ ํํฐ์ 1์ ์ ์ฅ |
| 202 | 2 | 2 | 202 ํด์ ๊ฒฐ๊ณผ๊ฐ 2 โ ํํฐ์ 2์ ์ ์ฅ |
| 303 | 3 | 3 | 303 ํด์ ๊ฒฐ๊ณผ๊ฐ 3 โ ํํฐ์ 3์ ์ ์ฅ |
| 404 | 4 | 4 | 404 ํด์ ๊ฒฐ๊ณผ๊ฐ 4 โ ํํฐ์ 4์ ์ ์ฅ |
| 505 | 0 | 0 | 505 ํด์ ๊ฒฐ๊ณผ๊ฐ 0 โ ํํฐ์ 0์ ์ ์ฅ |
| 606 | 1 | 1 | 606 ํด์ ๊ฒฐ๊ณผ๊ฐ 1 โ ํํฐ์ 1์ ์ ์ฅ |
roomId ๊ฐ์ ํํฐ์
(์: ํด์ ๊ฒฐ๊ณผ๋ก ํํฐ์
1)์ ๋ชจ์ ์์ฐจ์ ์ผ๋ก ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๋ค.Kafka ํ๋ก๋์๋ ๊ธฐ๋ณธ์ ์ผ๋ก ํค(key)๊ฐ ์๋ ๋ฉ์์ง๋ฅผ ๋ณด๋ผ ๋, ์ค์ ๋ ํํฐ์ ์๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ผ์ด๋ ๋ก๋น(RoundโRobin) ๋ฐฉ์์ผ๋ก ์์ฐจ ํ ๋นํ๋ค. ๋ฐ๋ผ์ ํ๋์ ํํฐ์ ๋ด์์๋ ๋ฉ์ธ์ง ์์๊ฐ ๋ณด์ฅ๋์ง๋ง, ํํฐ์ ์ด ์ฌ๋ฌ๊ฐ์ผ ๊ฒฝ์ฐ์๋ ์์๊ฐ ๋ณด์ฅ์๋๋ค.
๋๋ ํค(key) ๊ธฐ๋ฐ ํํฐ์
๋์ ์ฌ์ฉํ๋ฉด, ํ๋ก๋์๊ฐ ๋ฉ์์ง๋ฅผ ๋ณด๋ผ ๋ roomId๋ฅผ ํค๋ก ์ง์ ํ๋ค.
์ฆ, โํค๋ฅผ roomId๋ก ์ค์ ํ๋คโ๋ ๊ฑด, ๋์ผํ ๋ฐฉ์ ๋ฉ์์ง๋ผ๋ฆฌ ์ ์ฉ ํํฐ์
์ ๋ชจ์ ์์๋ฅผ ๋ณด์ฅํ๋ผ๋ ๋ป์ด๋ค.
ํ์ง๋ง ๋ค๋ง, ํ๋ฒ ๋๋ฆฐ ํํฐ์ ์ ์ ๋ ์ค์ผ ์ ์๊ธฐ ๋๋ฌธ์ ์ฒ์์ ๋ง์ ๊ณ ๋ฏผ์ ํ๊ณ ์ค๊ณ๋ฅผ ํด์ผํ๋ค.
| Partition | ํ ๋น๋ Consumer |
|---|---|
| 0 | Consumer 1 |
| 1 | Consumer 2 |
| 2 | Consumer 3 |
| 3 | Consumer 4 |
| 4 | Consumer 5 |


๊ฐ ํํฐ์ ์ Consumer Group ๋ด ํ๋์ Consumer์๊ฒ๋ง ํ ๋น๋จ โ ๋ณ๋ ฌ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅ. ์ฒ๋ฆฌ ์๋ ํฅ์
๊ฐ์ Consumer Group์ ์ํ ์ปจ์๋จธ๋ค์ ์ค๋ณต ์์ด ๋ฐ์ดํฐ ์๋น
๋ง์ฝ Consumer ์ > Partition ์๋ฉด, ์ผ๋ถ Consumer๋ ๋๊ฒ ๋จ
Consumer Group์ด ๋ค๋ฅด๋ฉด ๋ชจ๋ ๊ทธ๋ฃน์ด ๋ ๋ฆฝ์ ์ผ๋ก ๋์ผ ๋ฐ์ดํฐ ์๋น ๊ฐ๋ฅ
ํ ๊ฐ์ Consumer๋ ์ฌ๋ฌ ๊ฐ์ ํ ํฝ์ ์ฒ๋ฆฌ ๊ฐ๋ฅ
์ด๋ฒ ํ๋ก์ ํธ์์ ๋๋ ์ค์๊ฐ์ผ๋ก ์ฑํ ํ ์์๋ ์์คํ ์ ๋ง๋ค์ด๋ณผ๋ ค๊ณ ํ๋ค. ๋ฐ๋ผ์ ์์ง ๊ฐ๋ ์ ๋ถ์กฑํ์ง๋ง ๊ทธ๋๋ ์ฑ๊ณต(?)์ ํ์ผ๋ ๊ธฐ๋กํด๋ณผ๋ ค๊ณ ํ๋ค. ์๋ง ์๋ชป๋ ๋ถ๋ถ๋ ๋ง์์ ์ฐจํ์ ์์ ์ ํด์ผํ ๊ฒ ๊ฐ๋ค.
WebSocket/STOMP
@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler;
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/send");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry
.addEndpoint("/ws-stomp")
.setAllowedOriginPatterns("http://localhost:3000")
.withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(stompHandler);
}
}
์คํ๋ง ๋ด์ฅ ๋ฉ๋ชจ๋ฆฌ ๋ธ๋ก์ปค์ธ simpleBroker("/topic") ์ค์ ์ ์ ๊ฑฐํ์๋ค. ์ด์ ๋ฉ์์ง ๋ธ๋ก์ปค๋ก์ ์คํ๋ง์ ๊ธฐ๋ณธ Simple Broker ๋์ Kafka๋ฅผ ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ด๋ค. Kafka๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ๋ฐํ(Publish)ํ๊ณ ๊ตฌ๋ (Subscribe)ํ๋ ๊ตฌ์กฐ๋ก ์ ํํจ์ ๋ฐ๋ผ, ๋ ์ด์ enableSimpleBroker() ์ค์ ์ด ํ์ํ์ง ์๋ค.
KafkaProducerConfig
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, ChatRequest> chatRequestProducerFactory() {
return new DefaultKafkaProducerFactory<>(
producerConfig(),
new StringSerializer(),
new JsonSerializer<ChatRequest>()
);
}
@Bean
public KafkaTemplate<String, ChatRequest> chatRequestKafkaTemplate(
ProducerFactory<String, ChatRequest> chatRequestProducerFactory) {
return new KafkaTemplate<>(chatRequestProducerFactory);
}
}
KafkaProducer
@Component
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, ChatRequest> chatRequestKafkaTemplate;
public void send(String topic, String roomId, ChatRequest message) {
chatRequestKafkaTemplate.send(topic, roomId, message);
}
}
ChatController
@Controller
@RequiredArgsConstructor
public class ChatController {
private final KafkaProducer kafkaProducer;
@MessageMapping("/chat/{roomId}")
public void sendChat(
ChatRequest incoming,
@DestinationVariable("roomId") Long roomId,
Principal principal
) {
ChatMemberDetails userDetails = getUserDetails(
(UsernamePasswordAuthenticationToken) principal);
Long memberId = userDetails.getMemberId();
String userName = userDetails.getNickName();
ChatRequest enriched = createChatRequest(incoming, memberId, userName);
kafkaProducer.send(
"chat-messages",
roomId.toString(),
enriched
);
}
1. ํด๋ผ์ด์ธํธ โ WebSocket ์ ์ก
/send/chat/{roomId}๋ก STOMP ๋ฉ์์ง๋ฅผ ๋ณด๋ธ๋ค.@MessageMapping("/chat/{roomId}")์ ๋๋ฌํ๋ค.@DestinationVariable("roomId") Long roomId : ๊ฒฝ๋ก์ {roomId} ๊ฐ์ ์ถ์ถํ์ฌ ์ฑํ
๋ฐฉ ์๋ณ์๋ก ์ฌ์ฉ2. Spring Controller ์์
KafkaConsumerConfig
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> consumerConfig(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
@Bean
public ConsumerFactory<String, ChatRequest> chatRequestConsumerFactory() {
JsonDeserializer<ChatRequest> deserializer = new JsonDeserializer<>(ChatRequest.class);
deserializer.addTrustedPackages("yuhan.pro.chatserver.domain.dto");
String chatGroup = "${spring.application.name}-${random.uuid}";
return new DefaultKafkaConsumerFactory<>(
consumerConfig(chatGroup),
new StringDeserializer(),
deserializer
);
}
@Bean(name = "chatMessageFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ChatRequest>> factory(
ConsumerFactory<String, ChatRequest> chatRequestConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, ChatRequest> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(chatRequestConsumerFactory);
factory.setConcurrency(5);
return factory;
}
bootstrapServers: application.yml ๋ฑ์ ์ ์๋ Kafka ํด๋ฌ์คํฐ ์ฃผ์
ํค ์ญ์ง๋ ฌํ: ๋ฉ์์ง ํค๋ฅผ StringDeserializer๋ก
์คํ์
๋ฆฌ์
: ์ปจ์๋จธ ์์ ์์ latest๋ก ์ค์ ํด, ๊ฐ์ฅ ๋ง์ง๋ง(์ต์ ) ๋ฉ์์ง๋ถํฐ ์ฝ๊ธฐ
-> earliest๋
์ปจ์๋จธ๊ฐ ์ฒ์ ํฉ๋ฅํ๊ฑฐ๋ ์ ํจํ ์คํ์
์ด ์์ ๋, ํ ํฝ์ ๊ฐ์ฅ ์ฒ์๋ถํฐ(๊ฐ์ฅ ์ค๋๋ ๋ฉ์์ง) ์ฝ๊ธฐ ์์
| ์ํฉ | ๊ถ์ฅ ์ค์ |
|---|---|
| ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ง ์ค์ํ๊ณ ๊ณผ๊ฑฐ ์ด๋ ฅ์ ํ์ ์์ ๋ | latest |
| ์๋น์ค ์ด๊ธฐํ ์ ํ ํฝ ์ ์ฒด ์ด๋ ฅ์ ๋ก๋ํ๋ฉด์ ์ํ๋ฅผ ๊ตฌ์ถํ ๋ | earliest |
| ์ด๋ฏธ ์ฒ๋ฆฌ๋ ๋ฉ์์ง๊ฐ ๋ค์ ์ฌ์ฒ๋ฆฌ๋๋ ๊ฒ์ ํผํ๊ณ ์ถ์ ๋ | latest |
| ์ฌ์์ ์์๋ ๊ณผ๊ฑฐ ์ฒ๋ฆฌํ์ง ๋ชปํ ๋ฉ์์ง๋ถํฐ ์ด์ด์ ์ฒ๋ฆฌํ๊ณ ์ถ์ ๋ | earliest |
KafkaListeners
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaListeners {
private final SimpMessageSendingOperations messagingTemplate;
private final ChatService chatService;
@KafkaListener(
topics = "chat-messages",
groupId = "${spring.application.name}-${random.uuid}",
containerFactory = "chatMessageFactory"
)
public void handleChatMessage(
ChatRequest message,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_KEY) String roomId
) {
try {
log.info("Kafka ์์ - roomId: {}, partition: {}, message: {}", roomId, partition, message);
messagingTemplate.convertAndSend("/topic/rooms/" + roomId, message);
chatService.saveChat(message, Long.valueOf(roomId));
} catch (Exception e) {
log.error("Kafka ๋ฉ์์ง ์ฒ๋ฆฌ ์ค๋ฅ", e);
}
}
topics = "chat-messages"
โ ์ด ๋ฆฌ์ค๋ ๋ฉ์๋๋ chat-messages ํ ํฝ์ ๋ฐํ๋ ๋ชจ๋ ๋ฉ์์ง๋ฅผ ๊ตฌ๋
groupId = "${spring.application.name}-${random.uuid}"
โ ์ธ์คํด์ค๋ณ(ํน์ ์ธ์
๋ณ)๋ก ๊ณ ์ ํ ์ปจ์๋จธ ๊ทธ๋ฃน์ ๋ง๋ค์ด, ๊ฐ ์ ํ๋ฆฌ์ผ์ด์
์ธ์คํด์ค๊ฐ ๋ชจ๋ ๋ฉ์์ง๋ฅผ ๋
๋ฆฝ์ ์ผ๋ก ๋ฐ์ ์ฒ๋ฆฌํ๋๋ก ๋ณด์ฅ
containerFactory = "chatMessageFactory"
โ KafkaConsumerConfig์์ ์ค์ ํ ConcurrentKafkaListenerContainerFactory<String, ChatRequest> ๋น์ ์ฌ์ฉํด, JSON โ ChatRequest ์ญ์ง๋ ฌํ์ ๋ณ๋ ฌ์ฑ ์ค์ (Concurrency = 5)์ ์ ์ฉ
@Header(KafkaHeaders.RECEIVED_KEY) String roomId
kafkaProducer.send("chat-messages", roomId.toString(), enriched);
-> ์ด ๊ฐ์ ํ๋ก๋์(์ฌ๊ธฐ์๋ kafkaProducer.send(โฆ, roomId, enriched))์์ ๋ฉ์์ง๋ฅผ ๋ณด๋ผ ๋ ์ง์ ํ ํค(key) ์ ๋์ผ
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition
-> ํค ๊ธฐ๋ฐ ํํฐ์
๋๋ฅผ ์ฐ๊ณ ์๋ค๋ฉด(๊ธฐ๋ณธ ํํฐ์
๋๋ ํค์ ํด์๊ฐ์ ์ฌ์ฉ) ๊ฐ์ ํค(roomId)์ ๋ํด ํญ์ ๋์ผํ ํํฐ์
์ ์ ํ
WebSocket ๋ธ๋ก๋์บ์คํธ
messagingTemplate.convertAndSend("/topic/rooms/" + roomId, message)
โ STOMP ๊ตฌ๋
๊ฒฝ๋ก /topic/rooms/{roomId}๋ก ๋ค์ด์ ์๋ ๋ชจ๋ WebSocket ํด๋ผ์ด์ธํธ์๊ฒ ChatRequest๋ฅผ ์ ์ก
ํ์ฌ ๋๋ ๋จ์ผ ๋ธ๋ก์ปค(๋ธ๋ก์ปค 1๊ฐ)๋ฅผ ์ฌ์ฉ ์ค์ด์ง๋ง ๋ง์ฝ ๋จ์ผ ๋ธ๋ก์ปค๊ฐ ๋ค์ด๋๋ฉด ํด๋ฌ์คํฐ ์ ์ฒด๊ฐ ๋ง๋น๋๋ค. ๋ฐ๋ผ์ ๋ณดํต ๋ธ๋ก์ปค ์๋ฅผ ๋๋ ค 3๋ ์ ๋๋ก ์ด์ํ๋ ๊ฒ์ด ์ผ๋ฐ์ ์ด๋ค. ํ์ง๋ง ์ด ํ๋ก์ ํธ๋ ์ฌ์ด๋ ํ๋ก์ ํธ๋ก ๊ฐ๋ฐ ๋จ๊ณ์ ์์ด, ํธ์์ Kafka ๋ธ๋ก์ปค๋ฅผ ๋จ์ผ ์ธ์คํด์ค(๋ณต์ ํฉํฐ=1)๋ก๋ง ์ด์ํ๊ณ ์๋ค.

์ฐธ๊ณ :