메시지 브로커 & Kafka

HelloPong·2025년 8월 28일

공부

목록 보기
27/39
post-thumbnail

RabbitMQ vs Kafka — 같은 듯 다른 메시지 브로커 이야기

메시지 브로커(Message Broker)는 분산 시스템에서 빠질 수 없는 요소다.
서비스끼리 직접 통신하면 장애나 부하에 취약하기 때문에, 중간에 메시지를 받아주는 중재자가 필요하다.
대표적인 도구가 바로 RabbitMQKafka다.

둘 다 “메시지를 Producer가 넣고, Consumer가 읽는다”는 점에서 메시지 큐잉(Message Queuing) 의 공통점을 갖지만, 철학과 사용처는 전혀 다르다.


🐇 RabbitMQ — 메시지 큐의 정석

RabbitMQ는 오래된 전통 메시지 브로커다.

  • 큐(Queue) 에 메시지를 쌓고, 소비자가 읽으면 큐에서 사라진다.
  • Exchange를 통해 라우팅이 가능하다 (Direct, Fanout, Topic 등).
  • Ack/Nack, Dead Letter Queue 같은 기능으로 메시지를 안전하게 다룬다.

👉 주로 쓰이는 곳

  • 주문 이벤트 → 결제, 배송, 알림 서비스로 각각 라우팅
  • 금융 거래 이벤트 처리
  • IoT 센서에서 들어오는 소규모 메시지

RabbitMQ는 **“메시지를 원하는 곳에 정확히 배달하는 것”**에 강하다.


🐘 Kafka — 데이터 스트리밍의 표준

Kafka는 철학이 다르다.

  • 메시지가 큐에서 사라지지 않고, 디스크(Log) 에 일정 기간 보관된다.
  • 데이터를 토픽(Topic) → 파티션(Partition) 으로 나눠 병렬 저장한다.
  • 하나의 토픽을 여러 Consumer Group이 동시에 소비할 수 있다.

👉 주로 쓰이는 곳

  • 웹/앱 클릭 로그 수집 (초당 수십만~수백만 건)
  • 실시간 대시보드 업데이트
  • Fraud Detection, 추천 시스템, AI 학습 데이터 파이프라인
  • Kafka Connect로 S3/HDFS/DW에 자동 적재

Kafka는 **“데이터 스트림을 중심으로 여러 시스템을 연결하는 허브”**에 강하다.


📊 RabbitMQ vs Kafka 비교표

구분RabbitMQKafka
철학메시지 큐 (Queue)분산 로그 스트리밍 플랫폼
메시지 보관큐에서 소비 후 삭제retention 기간 동안 로그 보관
처리량초당 수천~수만 건초당 수십만~수백만 건
확장성제한적Partition 기반 수평 확장
라우팅강력 (Exchange, Routing Key)단순 (Topic/Partition)
소비자한 메시지는 한 Consumer여러 Consumer Group이 동시에 소비
활용 사례주문, 결제, 알림, IoT로그 수집, 실시간 분석, AI 파이프라인

✅ 언제 Kafka가 필요한가?

RabbitMQ로는 충분한 경우도 많지만, 아래 상황이라면 Kafka가 사실상 정답이다.

  1. 트래픽이 폭발적일 때

    • 초당 수십만 건 이상의 로그/이벤트가 꾸준히 들어올 때
  2. 데이터를 다시 읽어야 할 때

    • Kafka는 retention 기간 동안 데이터를 저장 → 장애 복구, 모델 재학습에 유리
  3. 여러 팀이 같은 데이터를 써야 할 때

    • Fraud Detection, 통계 집계, 알림 발송을 동시에 실행
  4. 데이터 파이프라인 허브가 필요할 때

    • Kafka Connect로 S3, HDFS, DW, Elasticsearch 등 다양한 곳으로 자동 배포

🎯 결론

RabbitMQ와 Kafka는 경쟁 관계라기보다는 용도가 다른 도구다.

  • RabbitMQ: 업무 이벤트, 주문/결제, 소규모 메시지 라우팅에 적합
  • Kafka: 대규모 로그 스트리밍, 데이터 파이프라인 허브, 실시간 분석에 적합

실무에서는 두 가지를 함께 쓰기도 한다.

  • RabbitMQ → 주문 이벤트 전달
  • Kafka → 로그/클릭/IoT 데이터 스트리밍

👉 핵심은, “내가 다루려는 데이터의 규모와 성격”을 보고 선택하는 것이다.

메시지 브로커→카프카 실전 설정 가이드

아래 그대로 블로그에 붙여도 된다. 운영 기준으로 핵심만 담았다.


🧩 카프카 핵심 구조 요약

  • 브로커: 메시지 저장·복제·전달.
  • 토픽: 파티션 단위로 병렬 처리.
  • 복제: replication.factor로 내결함성 확보.
  • 컨슈머 그룹: 파티션을 그룹 내 인스턴스에 분배.
  • KRaft 모드 권장: ZooKeeper 제거, controller/broker 역할 분리 가능.

🏗️ 클러스터/KRaft 기본 설정 (server.properties)

# 프로세스 역할
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093

# 네트워크
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://broker-1.mydomain:9092

# 로그/스토리지
log.dirs=/var/lib/kafka-logs
num.partitions=3
log.segment.bytes=1073741824          # 1GiB
log.retention.hours=168                # 7일
log.retention.check.interval.ms=300000

# 복제/안정성
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false   # 데이터 손실 방지
auto.leader.rebalance.enable=true

초기 포맷: kafka-storage.sh format --config server.properties --cluster-id <generated-id>


🛰️ 네트워크/광고 주소 패턴

  • 내부 POD/서버용: listeners=PLAINTEXT://:9092
  • 외부 접근 필요 시: advertised.listeners=PLAINTEXT://lb.domain:9092
  • 브로커별 고정 호스트네임 필수. DNS 불안정하면 컨슈머 재접속 지연 발생.

🔐 보안(선택)

# SSL
listeners=SSL://:9094
ssl.keystore.location=/etc/kafka/keystore.jks
ssl.keystore.password=****
ssl.key.password=****
# SASL/SCRAM
listeners=SASL_PLAINTEXT://:9095
sasl.enabled.mechanisms=SCRAM-SHA-512
listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="app" password="****";

🧮 토픽 설계 체크리스트

  • 파티션 수 = 목표 처리량(RPS) ÷ 단일 컨슈머 처리량 ≈ 여유 30% 포함.
  • replication.factor=3 기본. AZ 3개면 브로커 3개 필수.
  • 키 설계: 같은 키 → 같은 파티션. 순서 보장 필요 시 키 일관성 강제.
  • 대용량 로그는 압축(compaction) 또는 보관(retention) 중 선택.

토픽 수준 설정 예시

kafka-topics.sh --create \
  --topic app.logs.v1 \
  --partitions 12 \
  --replication-factor 3 \
  --config cleanup.policy=delete \
  --config retention.ms=604800000 \
  --config segment.bytes=1073741824 \
  --bootstrap-server broker-1:9092

로그 압축형(KV 최신값 유지):

--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--config segment.ms=3600000

🚚 프로듀서 설정 베스트프랙티스

bootstrap.servers=broker-1:9092,broker-2:9092,broker-3:9092
acks=all                    # ISR 모두가 기록해야 성공
enable.idempotence=true     # 중복 방지(필수)
max.in.flight.requests.per.connection=1  # 순서 보장 필요 시 1
retries=2147483647
compression.type=lz4        # 네트워크 절약, 지연 감소
linger.ms=5                 # 배치 형성
batch.size=131072           # 128KiB
delivery.timeout.ms=120000

트랜잭션(정확히 한 번, EOS)

enable.idempotence=true
transactional.id=app-tx-001   # 인스턴스별 고유
producer.initTransactions();
producer.beginTransaction();
producer.send(...);
producer.commitTransaction(); // 실패 시 abortTransaction()

🧲 컨슈머 설정 베스트프랙티스

bootstrap.servers=broker-1:9092,broker-2:9092,broker-3:9092
group.id=app-consumer
enable.auto.commit=false           # 수동 커밋 권장
auto.offset.reset=earliest         # 신규 그룹 시작점
max.poll.records=500
max.poll.interval.ms=300000
session.timeout.ms=10000
heartbeat.interval.ms=3000
fetch.min.bytes=1
fetch.max.bytes=52428800           # 50MiB
max.partition.fetch.bytes=5242880  # 5MiB
isolation.level=read_committed     # 트랜잭션 사용 시
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

정확한 커밋 패턴

  • 처리 성공 후 commitSync() 또는 배치 단위 커밋.
  • 재처리 가능 설계면 at-least-once. 중복 민감하면 프로듀서 EOS + 컨슈머 idempotent sink.

정적 멤버십

group.instance.id=app-c1   # 리스타트에도 동일 멤버로 간주 → 리밸런스 감소

🧵 리밸런스/스케일 전략

  • 리밸런스 잦으면: max.poll.interval.ms 증가, 처리 시간을 줄이거나, CooperativeSticky로 교체.
  • 인스턴스 수 ≤ 파티션 수. 초과 인스턴스는 유휴.
  • 장기 블로킹 처리 금지. 폴링 스레드와 처리 스레드 분리.

🧱 저장/세그먼트/보관 튜닝

  • log.segment.bytes 1–2GiB 권장. 세그먼트가 너무 작으면 파일 핸들↑.
  • 주기적 압축(compaction)이나 삭제(retention) 윈도우는 업무 RPO 기준으로 산출.
  • 디스크는 XFS/EXT4, 배치 I/O 유리. noatime 마운트 고려.

🧪 로컬 도커 컴포즈 예시(KRaft 단일 노드)

version: "3.8"
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.1
    environment:
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_MIN_INSYNC_REPLICAS: 1
    ports: ["9092:9092"]
    volumes: ["kdata:/var/lib/kafka/data"]
volumes: { kdata: {} }

🧰 CLI 운영 커맨드 모음

# 토픽 목록/상세
kafka-topics.sh --list --bootstrap-server b1:9092
kafka-topics.sh --describe --topic app.logs.v1 --bootstrap-server b1:9092

# 설정 변경(실행 중)
kafka-configs.sh --alter --bootstrap-server b1:9092 \
  --entity-type topics --entity-name app.logs.v1 \
  --add-config retention.ms=259200000

# 컨슈머 랙 확인
kafka-consumer-groups.sh --bootstrap-server b1:9092 \
  --group app-consumer --describe

🧩 Spring Kafka 설정 스니펫

application.yml

spring:
  kafka:
    bootstrap-servers: b1:9092,b2:9092,b3:9092
    producer:
      acks: all
      retries: 2147483647
      enable-idempotence: true
      compression-type: lz4
      batch-size: 131072
      linger-ms: 5
    consumer:
      group-id: app-consumer
      enable-auto-commit: false
      auto-offset-reset: earliest
      max-poll-records: 500
      properties:
        partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
        isolation.level: read_committed

Listener + 수동 커밋

@KafkaListener(topics = "app.logs.v1", containerFactory = "kafkaListenerContainerFactory")
public void onMessage(List<ConsumerRecord<String,String>> records, Acknowledgment ack) {
    // 배치 처리
    process(records);
    ack.acknowledge(); // 배치 커밋
}

컨테이너 팩토리

ConcurrentKafkaListenerContainerFactory<String,String> f = new ConcurrentKafkaListenerContainerFactory<>();
f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
f.setConcurrency(6); // 파티션 수 이하로

📊 모니터링 핵심 지표

  • 브로커: Under-replicated partitions, ISR 크기, Request latency, Disk 사용률.
  • 프로듀서: Record error rate, Retries, Batch size 평균, Compression ratio.
  • 컨슈머: Consumer lag, Rebalance rate, Poll interval, 처리 지연 p95/p99.

🧯 장애/데이터 보호 규칙

  • 쓰기 손실 방지: acks=all + min.insync.replicas>=2 + unclean.leader.election.enable=false.
  • 순서 보장: 파티션 키 고정 + max.in.flight.requests.per.connection=1.
  • 재시도 설계: 프로듀서 무한 재시도 + 백오프. 컨슈머는 DLT(사망 큐) 운용.
  • 롤링 재시작: 한 번에 1브로커. URP=0 확인 후 다음 브로커.

0개의 댓글