카프카 정복기 1편

한솔·2023년 5월 3일
1

OOO정복기

목록 보기
1/2
post-thumbnail

📌Pub/Sub 모델과 카프카

Pub/Sub 모델 (메시징 시스템)

  • 중간에 메시징 서버(브로커)를 두고 메시지를 송수신하는 모델
  • 구성 요소
    • message : 데이터 단위
    • topic (channel) : 메시지를 종류별로 관리하는 스토리
    • pubilsher (producer) : 메시지를 전달하는 주체
    • subscriber (comsumer) : 메시지를 가져가는 주체

일반 모델과 Pub/Sub 모델 비교

일반적인 통신방법

  • 특징
    • 송신자가 직접 수신자에게 데이터를 전달한다.
  • 장점
    • 빠른 전송 속도
    • 전송 결과를 신속하게 알 수 있다.
  • 단점
    • 특정 개체에 장애가 발생한 경우 메시지를 보내는 쪽에서 대기 처리 등을 개별적으로 해주지 않으면 시스템에 문제가 발생한다.
    • 시스템이 커지면 커질수록 확장성에 있어, 문제가 발생할 수 있다.

Pub/Sub 모델

  • 특징
    • 비동기 메시징 전송 방식
      • 프로듀서는 컨슈머가 메시지를 처리하는 동안, 다른 작업을 수행할 수 있다.
      • 프로듀서는 컨슈머를 기다리지 않고, 계속 토픽에 메시지를 전송한다.
    • 프로듀서는 컨슈머에게 직접 메시지를 전달하지 않고, 중간의 토픽에 전달한다.
    • 토픽에 메시지가 전달되면, 해당 토픽을 구독한 모든 컨슈머에게 메세지가 전달된다.
    • 발신자의 메시지에는 수신자가 정해져 있지 않다. (발신자는 토픽을 기준으로 전송하기 때문이다.)
    • 수신자는 발신자 정보가 없어도 원하는 메시지를 수신할 수 있다. (수신자는 토픽을 기준으로 수신하기 때문이다.)
  • 장점
    • 중앙의 메시징 시스템만 살아 있다면 프로듀서에서 전달된 메시지가 유실되지 않는다.
    • 메시징 시스템을 중심으로 연결되기 때문에 확장성이 용이하다.
  • 단점
    • 메시지가 정확하게 전달되었는지 확인하기 어렵다.
    • 메시지 전달 속도가 빠르지 않다.

카프카의 메시징 시스템

  • 일반적인 Pub/Sub 모델의 문제점 보완
    • 메시지 교환 전달의 신뢰성 관리를 프로듀서와 컨슈머 쪽으로 넘긴다. → 메시지 전달 결과를 확인하기 쉽다.
    • 부하가 많이 걸리는 교환기 기능을 컨슈머가 만들 수 있다. → 메시징 시스템 내에서의 작업량을 줄인다.

❓ 동기, 비동기 vs 블록킹, 논블록킹

  • 동기, 비동기 : 메서드를 제공하는 곳의 관점
    • 동기 - 결과 값이 결정될 때까지 반환하지 않는다.
    • 비동기 - 결과 값이 결정되기 전에 일단 반환한다.
  • 블록킹, 논블록킹 : 메서드를 호출(사용)하는 곳의 관점
    • 블록킹 - 최종 결과를 전달 받기 전까지는 기다려야 한다.
    • 논블록킹 - 최종 결과를 전달 받기 전까지 기다리지 않고, 다른 작업을 계속 수행한다.

📌 카프카 시작하기

카프카 설치

Apache Download Mirrors

  • 파일 다운로드 : wget https://archive.apache.org/dist/kafka/3.2.1/kafka_2.13-3.2.1.tgz
  • 압축 해제 : tar -xzf [파일.tgz]

외부 통신 설정

/config/server.properties는 Kafka 관련 설정 파일이다.
kafka를 실행하려면, kafka-server-start.bat 파일에 인자 값으로 server.properties 속성을 주어야한다.

 
############################# Server Basics #############################
 
# Broker의 ID로 Cluster내 Broker를 구분하기 위해 사용(Unique 값)
broker.id=0
 
############################# Socket Server Settings #############################
 
# Broker가 사용하는 호스트와 포트를 지정, 형식은 PLAINTEXT://your.host.name:port 을 사용
listeners=PLAINTEXT://:9092
 
# Producer와 Consumer가 접근할 호스트와 포트를 지정, 기본값은 listeners를 사용
advertised.listeners=PLAINTEXT://localhost:9092
 
# 네트워크 요청을 처리하는 Thread의 개수, 기본값 3
num.network.threads=3
 
# I/O가 생길때 마다 생성되는 Thread의 개수, 기본값 8
num.io.threads=8
 
# socket 서버가 사용하는 송수신 버퍼 (SO_SNDBUF, SO_RCVBUF) 사이즈, 기본값 102400
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
 
# 서버가 받을 수 있는 최대 요청 사이즈이며, 서버 메모리가 고갈 되는 것 방지
# JAVA의 Heap 보다 작게 설정해야 함, 기본값 104857600
socket.request.max.bytes=104857600
 
############################# Log Basics #############################
 
# 로그 파일을 저장할 디렉터리의 쉼표로 구분할 수 있음
log.dirs=C:/dev/kafka_2.13-2.6.0/logs
 
# 토픽당 파티션의 수를 의미, 
# 입력한 수만큼 병렬처리 가능, 데이터 파일도 그만큼 늘어남
num.partitions=1
 
# 시작 시 log 복구 및 종료 시 flushing에 사용할 데이터 directory당 Thread 개수
# 이 값은 RAID 배열에 데이터 directory에 대해 증가하도록 권장 됨
num.recovery.threads.per.data.dir=1
 
############################# Internal Topic Settings #############################
# 내부 Topic인 "_consumer_offsets", "_transaction_state"에 대한 replication factor
# 개발환경 : 1, 운영할 경우 가용성 보장을 위해 1 이상 권장(3 정도)
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
 
############################# Log Retention Policy #############################
 
# 세그먼트 파일의 삭제 주기, 기본값 hours, 168시간(7일)
# 옵션 [ bytes, ms, minutes, hours ] 
log.retention.hours=168
 
# 토픽별로 수집한 데이터를 보관하는 파일
# 세그먼트 파일의 최대 크기, 기본값 1GB
# 세그먼트 파일의 용량이 차면 새로운 파일을 생성
log.segment.bytes=1073741824
 
# 세그먼트 파일의 삭제 여부를 체크하는 주기, 기본값 5분(보존 정책)
log.retention.check.interval.ms=300000
 
############################# Zookeeper #############################
 
# 주키퍼의 접속 정보
# 쉼표(,)로 많은 연결 서버 포트 설정 가능
# 모든 kafka znode의 Root directory
zookeeper.connect=localhost:2181
 
# 주키퍼 접속 시도 제한시간(time out)
zookeeper.connection.timeout.ms=18000
 
 
############################# Group Coordinator Settings #############################
 
# GroupCoordinator 설정 - 컨슈머 rebalance를 지연시키는 시간
# 개발환경 : 테스트 편리를 위해 0으로 정의
# 운영환경 : 3초의 기본값을 설정하는게 좋음
 group.initial.rebalance.delay.ms=0

외부에서 카프카 브로커에 접근하기 위해서는 server.properties의 advertised.listeners를 설정해주어야 한다.

클라우드 플랫폼을 사용하고 있다면, 아래와 같이 서버의 외부 IP 주소를 이용해서 advertised.listeners를 설정한다.

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().

**advertised.listeners=PLAINTEXT://[서버 IP 주소]:9092**

config/server.properties

  • listeners
    • 카프카 브로커가 내부적으로 바인딩하는 주소
  • advertised.listeners
    • 카프카 프로듀서, 컨슈머에게 노출할 주소
    • 설정하지 않을 경우 디폴트로 listners 설정이 적용된다.

추가적으로 GCP에서 9092포트에 대한 방화벽 규칙도 설정한다.


주키퍼

❓카프카 클러스터

  • 브로커(카프카 서버)로 이루어진 집합체
  • 클러스터를 구성하는 브로커들끼리 공유되는 데이터를 유지하거나 특별한 조율을 하려면, 코디네이션 어플리케이션이 필요하다.
    → 주키퍼의 필요성 등장
  • 주키퍼는 코디네이션 서비스의 한 종류이다.
  • 주키퍼의 역할
    • 설정 관리 (Configuration Management) : 클러스터의 설정 정보를 최신으로 유지한다.
    • 클러스터 관리 (Cluster Management) : 브로커의 추가/제외/상태 변화를 감지해, 정보를 내부의 브로커들에게 공유한다.
  • 주키퍼 앙상블
    • 코디네이션 서비스의 장애 전파 특성을 해결하기 위해, 다수의 주키퍼를 사용하는 클러스터 구조를 말한다.
    • 살아 있는 노드 수가 과반 수 이상 유지되면, 지속적인 서비스가 가능하다. (판단을 쉽게 하기 위해 홀수로 설정한다.)
    • 노드 수가 많을 수록 장애에 견고한 서비스를 제공할 수 있다.

분산 코디네이션 서비스

  • 분산 처리 시스템의 난제인 ‘정보 공유’ 문제를 해결하기 위한 서비스
  • 역할
    • 하위 노드들의 상태 체크
    • 자원 점유 문제 해결 (Lock, Unlock 관리)
  • 장애 전파 특성 : 코디네이션 시스템의 장애는 전체 시스템 장애로 이어진다.

카프카와 주키퍼 : 역할 분리

  • 카프카
    • 브로커의 상태를 저장하지 않고, 메시지를 주고받는 기능에 집중한다.
    • Producer와 Consumer는 브로커의 정보를 알고 있다.
    • 브로커의 상태가 변경되면, 변경된 정보를 주키퍼를 통해 전달받는다.
  • 주키퍼
    • 브로커의 상태를 관리한다. ex) 새로운 토픽이 생성되었을 때, 토픽의 생성과 소비에 대한 상태를 저장한다.

주키퍼, 카프카 실행

  • 주키퍼 실행 : bin/zookeeper-server-start.sh -daemon config/zookeeper.propertie
  • 카프카 실행 : bin/kafka-server-start.sh -daemon config/server.properties

토픽 생성

  • bin/kafka-topics.sh --create --topic [토픽 이름] --bootstrap-server localhost:9092
  • 옵션
    • --create : 생성
    • --topic : topic 이름
    • --bootstrap-server : 연결한 카프카 서버 주소
    • --partitions : 생성하는 토픽의 파티션 수
    • --replication-factor : 생성하는 토픽의 각 파티션의 replication-factor 개수

토픽 생성 확인

  • bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

통신 테스트

  • 프로듀서 bin/kafka-console-producer.sh --topic [TOPICNAME] --bootstrap-server localhost:9092
  • 컨슈머 bin/kafka-console-consumer.sh --topic [TOPICNAME] --from-beginning --bootstrap-server localhost:9092

실행 결과는 아래와 같다.
프로듀서는 토픽에 메시지를 전송하고, 컨슈머는 전달된 메시지를 수신한다.

profile
나는 N번째 다시 태어났다.

0개의 댓글