❓ 동기, 비동기 vs 블록킹, 논블록킹
- 동기, 비동기 : 메서드를 제공하는 곳의 관점
- 동기 - 결과 값이 결정될 때까지 반환하지 않는다.
- 비동기 - 결과 값이 결정되기 전에 일단 반환한다.
- 블록킹, 논블록킹 : 메서드를 호출(사용)하는 곳의 관점
- 블록킹 - 최종 결과를 전달 받기 전까지는 기다려야 한다.
- 논블록킹 - 최종 결과를 전달 받기 전까지 기다리지 않고, 다른 작업을 계속 수행한다.
wget https://archive.apache.org/dist/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar -xzf [파일.tgz]
/config/server.properties는 Kafka 관련 설정 파일이다.
kafka를 실행하려면, kafka-server-start.bat 파일에 인자 값으로 server.properties
속성을 주어야한다.
############################# Server Basics #############################
# Broker의 ID로 Cluster내 Broker를 구분하기 위해 사용(Unique 값)
broker.id=0
############################# Socket Server Settings #############################
# Broker가 사용하는 호스트와 포트를 지정, 형식은 PLAINTEXT://your.host.name:port 을 사용
listeners=PLAINTEXT://:9092
# Producer와 Consumer가 접근할 호스트와 포트를 지정, 기본값은 listeners를 사용
advertised.listeners=PLAINTEXT://localhost:9092
# 네트워크 요청을 처리하는 Thread의 개수, 기본값 3
num.network.threads=3
# I/O가 생길때 마다 생성되는 Thread의 개수, 기본값 8
num.io.threads=8
# socket 서버가 사용하는 송수신 버퍼 (SO_SNDBUF, SO_RCVBUF) 사이즈, 기본값 102400
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# 서버가 받을 수 있는 최대 요청 사이즈이며, 서버 메모리가 고갈 되는 것 방지
# JAVA의 Heap 보다 작게 설정해야 함, 기본값 104857600
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 로그 파일을 저장할 디렉터리의 쉼표로 구분할 수 있음
log.dirs=C:/dev/kafka_2.13-2.6.0/logs
# 토픽당 파티션의 수를 의미,
# 입력한 수만큼 병렬처리 가능, 데이터 파일도 그만큼 늘어남
num.partitions=1
# 시작 시 log 복구 및 종료 시 flushing에 사용할 데이터 directory당 Thread 개수
# 이 값은 RAID 배열에 데이터 directory에 대해 증가하도록 권장 됨
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# 내부 Topic인 "_consumer_offsets", "_transaction_state"에 대한 replication factor
# 개발환경 : 1, 운영할 경우 가용성 보장을 위해 1 이상 권장(3 정도)
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
# 세그먼트 파일의 삭제 주기, 기본값 hours, 168시간(7일)
# 옵션 [ bytes, ms, minutes, hours ]
log.retention.hours=168
# 토픽별로 수집한 데이터를 보관하는 파일
# 세그먼트 파일의 최대 크기, 기본값 1GB
# 세그먼트 파일의 용량이 차면 새로운 파일을 생성
log.segment.bytes=1073741824
# 세그먼트 파일의 삭제 여부를 체크하는 주기, 기본값 5분(보존 정책)
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# 주키퍼의 접속 정보
# 쉼표(,)로 많은 연결 서버 포트 설정 가능
# 모든 kafka znode의 Root directory
zookeeper.connect=localhost:2181
# 주키퍼 접속 시도 제한시간(time out)
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# GroupCoordinator 설정 - 컨슈머 rebalance를 지연시키는 시간
# 개발환경 : 테스트 편리를 위해 0으로 정의
# 운영환경 : 3초의 기본값을 설정하는게 좋음
group.initial.rebalance.delay.ms=0
외부에서 카프카 브로커에 접근하기 위해서는 server.properties의 advertised.listeners를 설정해주어야 한다.
클라우드 플랫폼을 사용하고 있다면, 아래와 같이 서버의 외부 IP 주소를 이용해서 advertised.listeners를 설정한다.
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
**advertised.listeners=PLAINTEXT://[서버 IP 주소]:9092**
추가적으로 GCP에서 9092포트에 대한 방화벽 규칙도 설정한다.
❓카프카 클러스터
- 브로커(카프카 서버)로 이루어진 집합체
- 클러스터를 구성하는 브로커들끼리 공유되는 데이터를 유지하거나 특별한 조율을 하려면, 코디네이션 어플리케이션이 필요하다.
→ 주키퍼의 필요성 등장
❓분산 코디네이션 서비스
- 분산 처리 시스템의 난제인 ‘정보 공유’ 문제를 해결하기 위한 서비스
- 역할
- 하위 노드들의 상태 체크
- 자원 점유 문제 해결 (Lock, Unlock 관리)
- 장애 전파 특성 : 코디네이션 시스템의 장애는 전체 시스템 장애로 이어진다.
bin/zookeeper-server-start.sh -daemon config/zookeeper.propertie
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-topics.sh --create --topic [토픽 이름] --bootstrap-server localhost:9092
--create
: 생성--topic
: topic 이름--bootstrap-server
: 연결한 카프카 서버 주소--partitions
: 생성하는 토픽의 파티션 수--replication-factor
: 생성하는 토픽의 각 파티션의 replication-factor 개수bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
bin/kafka-console-producer.sh --topic [TOPICNAME] --bootstrap-server localhost:9092
bin/kafka-console-consumer.sh --topic [TOPICNAME] --from-beginning --bootstrap-server localhost:9092
실행 결과는 아래와 같다.
프로듀서는 토픽에 메시지를 전송하고, 컨슈머는 전달된 메시지를 수신한다.