메시지 브로커(Message Broker)는 분산 시스템에서 빠질 수 없는 요소다.
서비스끼리 직접 통신하면 장애나 부하에 취약하기 때문에, 중간에 메시지를 받아주는 중재자가 필요하다.
대표적인 도구가 바로 RabbitMQ와 Kafka다.
둘 다 “메시지를 Producer가 넣고, Consumer가 읽는다”는 점에서 메시지 큐잉(Message Queuing) 의 공통점을 갖지만, 철학과 사용처는 전혀 다르다.
RabbitMQ는 오래된 전통 메시지 브로커다.
👉 주로 쓰이는 곳
RabbitMQ는 **“메시지를 원하는 곳에 정확히 배달하는 것”**에 강하다.
Kafka는 철학이 다르다.
👉 주로 쓰이는 곳
Kafka는 **“데이터 스트림을 중심으로 여러 시스템을 연결하는 허브”**에 강하다.
| 구분 | RabbitMQ | Kafka |
|---|---|---|
| 철학 | 메시지 큐 (Queue) | 분산 로그 스트리밍 플랫폼 |
| 메시지 보관 | 큐에서 소비 후 삭제 | retention 기간 동안 로그 보관 |
| 처리량 | 초당 수천~수만 건 | 초당 수십만~수백만 건 |
| 확장성 | 제한적 | Partition 기반 수평 확장 |
| 라우팅 | 강력 (Exchange, Routing Key) | 단순 (Topic/Partition) |
| 소비자 | 한 메시지는 한 Consumer | 여러 Consumer Group이 동시에 소비 |
| 활용 사례 | 주문, 결제, 알림, IoT | 로그 수집, 실시간 분석, AI 파이프라인 |
RabbitMQ로는 충분한 경우도 많지만, 아래 상황이라면 Kafka가 사실상 정답이다.
트래픽이 폭발적일 때
데이터를 다시 읽어야 할 때
여러 팀이 같은 데이터를 써야 할 때
데이터 파이프라인 허브가 필요할 때
RabbitMQ와 Kafka는 경쟁 관계라기보다는 용도가 다른 도구다.
실무에서는 두 가지를 함께 쓰기도 한다.
👉 핵심은, “내가 다루려는 데이터의 규모와 성격”을 보고 선택하는 것이다.
아래 그대로 블로그에 붙여도 된다. 운영 기준으로 핵심만 담았다.
replication.factor로 내결함성 확보.controller/broker 역할 분리 가능.server.properties)# 프로세스 역할
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
# 네트워크
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://broker-1.mydomain:9092
# 로그/스토리지
log.dirs=/var/lib/kafka-logs
num.partitions=3
log.segment.bytes=1073741824 # 1GiB
log.retention.hours=168 # 7일
log.retention.check.interval.ms=300000
# 복제/안정성
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false # 데이터 손실 방지
auto.leader.rebalance.enable=true
초기 포맷:
kafka-storage.sh format --config server.properties --cluster-id <generated-id>
listeners=PLAINTEXT://:9092advertised.listeners=PLAINTEXT://lb.domain:9092# SSL
listeners=SSL://:9094
ssl.keystore.location=/etc/kafka/keystore.jks
ssl.keystore.password=****
ssl.key.password=****
# SASL/SCRAM
listeners=SASL_PLAINTEXT://:9095
sasl.enabled.mechanisms=SCRAM-SHA-512
listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="app" password="****";
replication.factor=3 기본. AZ 3개면 브로커 3개 필수.kafka-topics.sh --create \
--topic app.logs.v1 \
--partitions 12 \
--replication-factor 3 \
--config cleanup.policy=delete \
--config retention.ms=604800000 \
--config segment.bytes=1073741824 \
--bootstrap-server broker-1:9092
로그 압축형(KV 최신값 유지):
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--config segment.ms=3600000
bootstrap.servers=broker-1:9092,broker-2:9092,broker-3:9092
acks=all # ISR 모두가 기록해야 성공
enable.idempotence=true # 중복 방지(필수)
max.in.flight.requests.per.connection=1 # 순서 보장 필요 시 1
retries=2147483647
compression.type=lz4 # 네트워크 절약, 지연 감소
linger.ms=5 # 배치 형성
batch.size=131072 # 128KiB
delivery.timeout.ms=120000
트랜잭션(정확히 한 번, EOS)
enable.idempotence=true
transactional.id=app-tx-001 # 인스턴스별 고유
producer.initTransactions();
producer.beginTransaction();
producer.send(...);
producer.commitTransaction(); // 실패 시 abortTransaction()
bootstrap.servers=broker-1:9092,broker-2:9092,broker-3:9092
group.id=app-consumer
enable.auto.commit=false # 수동 커밋 권장
auto.offset.reset=earliest # 신규 그룹 시작점
max.poll.records=500
max.poll.interval.ms=300000
session.timeout.ms=10000
heartbeat.interval.ms=3000
fetch.min.bytes=1
fetch.max.bytes=52428800 # 50MiB
max.partition.fetch.bytes=5242880 # 5MiB
isolation.level=read_committed # 트랜잭션 사용 시
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
정확한 커밋 패턴
commitSync() 또는 배치 단위 커밋.정적 멤버십
group.instance.id=app-c1 # 리스타트에도 동일 멤버로 간주 → 리밸런스 감소
max.poll.interval.ms 증가, 처리 시간을 줄이거나, CooperativeSticky로 교체.log.segment.bytes 1–2GiB 권장. 세그먼트가 너무 작으면 파일 핸들↑.noatime 마운트 고려.version: "3.8"
services:
kafka:
image: confluentinc/cp-kafka:7.6.1
environment:
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_MIN_INSYNC_REPLICAS: 1
ports: ["9092:9092"]
volumes: ["kdata:/var/lib/kafka/data"]
volumes: { kdata: {} }
# 토픽 목록/상세
kafka-topics.sh --list --bootstrap-server b1:9092
kafka-topics.sh --describe --topic app.logs.v1 --bootstrap-server b1:9092
# 설정 변경(실행 중)
kafka-configs.sh --alter --bootstrap-server b1:9092 \
--entity-type topics --entity-name app.logs.v1 \
--add-config retention.ms=259200000
# 컨슈머 랙 확인
kafka-consumer-groups.sh --bootstrap-server b1:9092 \
--group app-consumer --describe
application.yml
spring:
kafka:
bootstrap-servers: b1:9092,b2:9092,b3:9092
producer:
acks: all
retries: 2147483647
enable-idempotence: true
compression-type: lz4
batch-size: 131072
linger-ms: 5
consumer:
group-id: app-consumer
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 500
properties:
partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
isolation.level: read_committed
Listener + 수동 커밋
@KafkaListener(topics = "app.logs.v1", containerFactory = "kafkaListenerContainerFactory")
public void onMessage(List<ConsumerRecord<String,String>> records, Acknowledgment ack) {
// 배치 처리
process(records);
ack.acknowledge(); // 배치 커밋
}
컨테이너 팩토리
ConcurrentKafkaListenerContainerFactory<String,String> f = new ConcurrentKafkaListenerContainerFactory<>();
f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
f.setConcurrency(6); // 파티션 수 이하로
acks=all + min.insync.replicas>=2 + unclean.leader.election.enable=false.max.in.flight.requests.per.connection=1.URP=0 확인 후 다음 브로커.