이제 Logstash가 보낸 로그를 카프카의 원하는 토픽으로 가게 됩니다.
카프카는 실제 인스턴스에 하나씩 카프카를 설치해서 클러스터를 구성하는게 맞지만
서비스의 규모와 비용을 생각해서 docker-compose를 통해서 클러스터를 구축하였습니다.
카프카에 대한 설명은 저의 블로그에 정리를 했습니다.
🔗
카프카 개념 : https://velog.io/@moon_happy/Kafka%EB%8A%94-%EB%AD%90%EB%9E%8C
주키퍼 개념 :
version: '3.7'
services:
zk1:
container_name: zookeeper1
image: wurstmeister/zookeeper:latest
restart: always
hostname: zk1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
volumes:
- "~/zk-cluster/zk1/data:/data"
zk2:
container_name: zookeeper2
image: wurstmeister/zookeeper:latest
restart: always
hostname: zk2
ports:
- "2182:2181"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
volumes:
- "~/zk-cluster/zk2/data:/data"
zk3:
container_name: zookeeper3
image: wurstmeister/zookeeper:latest
restart: always
hostname: zk3
ports:
- "2183:2181"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
volumes:
- "~/zk-cluster/zk3/data:/data"
kafka1:
container_name: kafka1
image: wurstmeister/kafka:latest
restart: on-failure
depends_on:
- zk1
- zk2
- zk3
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: IP
BOOTSTRAP_SERVERS: IP:9092,IP:9093, IP:9094
KAFKA_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2182,zk3:2183"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
kafka2:
container_name: kafka2
image: wurstmeister/kafka:latest
restart: on-failure
depends_on:
- zk1
- zk2
- zk3
ports:
- "9093:9092"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_HOST_NAME: IP
BOOTSTRAP_SERVERS: IP:9092, IP:9093, IP:9094
KAFKA_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2182,zk3:2183"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
kafka3:
container_name: kafka3
image: wurstmeister/kafka:latest
restart: on-failure
depends_on:
- zk1
- zk2
- zk3
ports:
- "9094:9092"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_HOST_NAME: IP
BOOTSTRAP_SERVERS: IP:9092, IP:9093, IP:9094
KAFKA_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2182,zk3:2183"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "10000:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=:IP:9092,IP:9093,IP:9094
- KAFKA_CLUSTERS_0_ZOOKEEPER=zk1:2181,zk2:2182,zk1:2183
주키퍼는 3개로 클러스터를 구성하였습니다.
주키퍼는 분산 시스템에서 사용되는 오픈소스 서비스로, Kafka에서 메타데이터를 관리하는데 사용됩니다.
각각의 주키퍼는 고유한 ID(ZOO_MY_ID)와 다른 인스턴스들에 대한 정보(ZOO_SERVERS)를 가지며, 별도의 포트(2181~2183)에서 실행됩니다.
카프카 브로커는 메세지를 받아서 저장하고 전달하는 역할을 합니다.
여기서도 3개의 컨테이너를 브로커로 생성하고 고가용성을 확보했습니다.
웹 기반 인터페이싱으로 카프카 클러스터 상태를 모니터링하고 메시지 데이터 등을 조회할 수 있습니다.
저는 토픽을 2개로 하고 파티션은 3개 복제계수는 3개로 모든 브로커에 적재될 수 있도록 구성하였습니다.
user2 : 모든 로그가 향하고 컨슈머를 통해서 BigQuery로 향합니다.
BigQuery에서는 분석가가 분석을하고 모델을 만드는 데이터마트의 역할을 하게됩니다.
user3 : Logstash에서 모든 로그중에서 info라는 Key에서 value가 search
또는 movie_detail
이면 user3로 오게됩니다.
이 토픽에 들어온 메세지들은 컨슈머를 통해서 Redis로 가며, Redis에서 집계와 사용자의 기록을 빠르게 보여주기위해서 구축하였습니다.
일반적으로, Kafka를 사용할 때는 Java를 통해 컨슈머를 구현하고, 하나의 토픽을 생성하여 여기에 파티션을 만들어 컨슈머를 분리합니다. 이렇게 하면 메시지의 순서를 보장하면서도 병렬 처리의 장점을 활용할 수 있습니다.
그런데 이 선택은 새로운 도전과 고민을 가져왔습니다.
Python은 GIL(Global Interpreter Lock) 때문에 멀티스레딩이 제한적이며, 일반적으로 싱글 스레드 환경에서 가장 잘 동작합니다.
따라서 Java와 같은 높은 처리량과 낮은 지연 시간을 구현하기 위해서 목적지가 다르게 토픽을 2개로 만들고 2개의 컨슈머를 만들어 하나의 토픽만 집중할 수 있었으며 싱글스레딩 문제를 해결하였습니다.
하지만 이렇게 했을때 메세지들의 순서는 보장되지 않습니다.
프로듀서의 역할을 하는 Logstash에서 브로커에 메세지를 전달할때
파티션을 지정하지 않았기 때문에 라운드 로빈방식으로 3개의 파티션에 균등하게 적재됩니다.이렇게 되면 같은 시간에 전송된 메시지라도 서로 다른 파티션에 저장될 수 있습니다.
이 경우, offset 기준으로 메시지를 가져오는 컨슈머는 파티션 간 메시지 순서를 알 수 없습니다.
하지만 순서를 보장하지 않은 이유는 순서가 상관이 없기 때문입니다.
메세지 순서를 보장하기 위한 방법
- 메시지에 키를 지정하여 파티셔너를 통해 메시지를 적절한 파티션으로 분배합니다.
- 단일 파티션으로 토픽을 생성합니다.
만약 티켓팅이나 품절이 빠른 쇼핑몰이라면 순서가 매우 중요하기 때문에 순서를 보장해야하지만
요번 프로젝트에서 로그를 수집하는 목적은 2가지 였습니다.
그렇기 때문에 순서를 보장하지 않았습니다.