[Apache Kafka] 아프카 카파치를 본격적으로 알아보기 이전에 CLI부터 다뤄보겠습니다

Doccimann·2022년 6월 1일
0

Kafka

목록 보기
2/4
post-thumbnail

👉 본격적으로 글을 작성하기 이전에

저희는 우선 카프카에 대해서 본격적으로 알아보기 이전에 카프카를 ec2에 설치해보고 실습해보는 시간부터 가져보려고 합니다.

이 글은 그림이 하나도 없으니 조심하라구.....

우선 EC2에 Apache Kafka를 설치해보도록 하겠습니다!


🔥 Apache Kafka를 EC2에 설치하겠습니다.

Apache Kafka는 분산 코디네이션 서비스를 제공하는 주키퍼와 함께 구동해야하기 때문에 힙 메모리를 넉넉하게 잡아야합니다. 따라서 저희는 프리티어 기준의 t2.micro가 아닌 t2.small 인스턴스를 이용해서 실습하도록 하겠습니다.

우선 Kafka의 경우 Java와 Scalar로 작성이 되어있기 때문에 Java 11버전을 우선 설치하도록 하겠습니다.

# aws coreetto 다운로드
$ sudo curl -L https://corretto.aws/downloads/latest/amazon-corretto-11-x64-linux-jdk.rpm -o jdk11.rpm

# jdk11 설치
$ sudo yum localinstall jdk11.rpm -y

# jdk version 선택
$ sudo /usr/sbin/alternatives --config java

# java 버전 확인
$ java --version

# 다운받은 설치키트 제거
$ rm -rf jdk11.rpm

다음으로 카프카를 설치하겠습니다. 카프카는 2.5.0 버전으로 설치하도록 하겠습니다!

# Apache Kafka 다운로드
$ wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz

# 압축 해제
$ tar xvf kafka_2.12-2.5.0.tgz

# kafka 폴더로 이동
cd kafka_2.12-2.5.0/

다음으로, 카프카 브로커 실행 옵션을 변경하도록 하겠습니다. 저희는 카프카 클러스터를 단일 브로커로 구성해서 사용할 예정이며, 또한 listener의 경우 보안그룹으로 막아버릴 생각이기 때문에 저희는 advertised.listeners 속성만 수정하도록 하겠습니다.

$ vim config/server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

.
(생략)
.
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://[YOUR HOST IP]:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

.
(생략)
.

이제 주키퍼를 작동시키고, 다음으로 카프카를 작동시키겠습니다.

zookeeper와 kafka 모두 -daemon 설정을 부여해서 백그라운드로 동작하도록 하겠습니다.

👉 커맨드

# zookeeper 실행. 실행시 zookeeper.properties를 전달해야한다
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

# 동작하는지 확인
jps -m

👉 결과

3814 QuorumPeerMain config/zookeeper.properties
3851 Jps -m

이제 카프카 브로커를 실행하도록 하겠습니다.

카프카 브로커를 실행시킬 때 저희가 수정했던 config/server.properties를 전달해서 실행시키면 되겠습니다!

👉 커맨드

# Kafka 실행
$ bin/kafka-server-start.sh --daemon config/server.properties

# Jps를 통해 JVM 위에 Kafka가 올라왔는지 검사하기
$ jps -m

👉 결과

3814 QuorumPeerMain config/zookeeper.properties
4248 Jps -m
4203 Kafka config/server.properties

이제 카프카의 로그를 뜯어보겠습니다.

👉 커맨드

# 로그 확인
tail -f logs/server.log

👉 결과

[2022-06-01 12:40:23,842] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2022-06-01 12:40:23,854] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2022-06-01 12:40:23,862] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2022-06-01 12:40:24,092] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-06-01 12:40:24,152] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2022-06-01 12:40:24,196] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2022-06-01 12:40:24,214] INFO Kafka version: 2.5.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-06-01 12:40:24,214] INFO Kafka commitId: 66563e712b0b9f84 (org.apache.kafka.common.utils.AppInfoParser)
[2022-06-01 12:40:24,214] INFO Kafka startTimeMs: 1654087224196 (org.apache.kafka.common.utils.AppInfoParser)
[2022-06-01 12:40:24,215] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

이제 EC2에 카프카 브로커가 올라왔습니다!

다음으로 Kafka를 로컬에 설치해서 EC2에 올라온 kafka와 통신하도록 하겠습니다.


🔥 로컬에 Kafka를 설치하겠습니다.

로컬에 카프카를 설치하겠습니다.

👉 커맨드

# 카프카 다운로드
$ curl https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz --output kafka.tgz

# 카프카 압축해제
$ tar -xvf kafka.tgz

# 폴더 이동
$ cd kafka_2.12-2.5.0/

# 카프카 버전 확인
bin/kafka-broker-api-versions.sh --bootstrap-server [Your Host IP]:9092

👉 결과

[Your Host IP]:9092 (id: 0 rack: null) -> (
	Produce(0): 0 to 8 [usable: 8],
	Fetch(1): 0 to 11 [usable: 11],
	ListOffsets(2): 0 to 5 [usable: 5],
	Metadata(3): 0 to 9 [usable: 9],
	LeaderAndIsr(4): 0 to 4 [usable: 4],
	StopReplica(5): 0 to 2 [usable: 2],
	UpdateMetadata(6): 0 to 6 [usable: 6],
	ControlledShutdown(7): 0 to 3 [usable: 3],
	OffsetCommit(8): 0 to 8 [usable: 8],
	OffsetFetch(9): 0 to 7 [usable: 7],
	FindCoordinator(10): 0 to 3 [usable: 3],
	JoinGroup(11): 0 to 7 [usable: 7],
	Heartbeat(12): 0 to 4 [usable: 4],
	LeaveGroup(13): 0 to 4 [usable: 4],
	SyncGroup(14): 0 to 5 [usable: 5],
	DescribeGroups(15): 0 to 5 [usable: 5],
	ListGroups(16): 0 to 3 [usable: 3],
	SaslHandshake(17): 0 to 1 [usable: 1],
	ApiVersions(18): 0 to 3 [usable: 3],
	CreateTopics(19): 0 to 5 [usable: 5],
	DeleteTopics(20): 0 to 4 [usable: 4],
	DeleteRecords(21): 0 to 1 [usable: 1],
	InitProducerId(22): 0 to 3 [usable: 3],
	OffsetForLeaderEpoch(23): 0 to 3 [usable: 3],
	AddPartitionsToTxn(24): 0 to 1 [usable: 1],
	AddOffsetsToTxn(25): 0 to 1 [usable: 1],
	EndTxn(26): 0 to 1 [usable: 1],
	WriteTxnMarkers(27): 0 [usable: 0],
	TxnOffsetCommit(28): 0 to 3 [usable: 3],
	DescribeAcls(29): 0 to 2 [usable: 2],
	CreateAcls(30): 0 to 2 [usable: 2],
	DeleteAcls(31): 0 to 2 [usable: 2],
	DescribeConfigs(32): 0 to 2 [usable: 2],
	AlterConfigs(33): 0 to 1 [usable: 1],
	AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
	DescribeLogDirs(35): 0 to 1 [usable: 1],
	SaslAuthenticate(36): 0 to 2 [usable: 2],
	CreatePartitions(37): 0 to 2 [usable: 2],
	CreateDelegationToken(38): 0 to 2 [usable: 2],
	RenewDelegationToken(39): 0 to 2 [usable: 2],
	ExpireDelegationToken(40): 0 to 2 [usable: 2],
	DescribeDelegationToken(41): 0 to 2 [usable: 2],
	DeleteGroups(42): 0 to 2 [usable: 2],
	ElectLeaders(43): 0 to 2 [usable: 2],
	IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
	AlterPartitionReassignments(45): 0 [usable: 0],
	ListPartitionReassignments(46): 0 [usable: 0],
	OffsetDelete(47): 0 [usable: 0]
)

🔥 로컬 카프카와 EC2의 카프카와 네트워크 테스트를 해보겠습니다.

kafka-verifiable-producer.shkafka-verifiable-consumer.sh를 이용해서 메시지를 주고받으면서 제대로 통신이 되는지 확인을 해보도록 하겠습니다.

우선 producer측에서 kafka로 메시지를 쏴보도록 하겠습니다.

👉 커맨드

# verifiable-producer을 이용해서 EC2의 카프카에 테스트 메시지 전송
$ bin/kafka-verifiable-producer.sh --bootstrap-server [Your Host IP]:9092 --max-messages 10 --topic verify-test

커맨드를 분석해보도록 하겠습니다. 저희는 EC2를 향해 메시지를 전송해야하기 때문에 로컬은 producer의 입장입니다. 따라서 kafka-verifiable-producer.sh를 이용해야합니다.

그리고 --bootstrap-server 옵션으로 EC2의 호스트 ip, port 정보를 적어줍니다.

그리고 --topic 옵션에 토픽 이름을 써주시면 되는데, 토픽 이름은 verify-test로 보내보도록 하겠습니다.

카프카는 모든 데이터(log)를 토픽 단위로 관리합니다. 그리고 토픽 안에서는 Partition으로 나눠서 데이터를 관리하는 형식입니다. 간단하게 말해서, topic은 RDBMS에서 비유하자면 테이블과 비슷한 역할을 수행한다고 보면 되겠습니다!

👉 결과

{"timestamp":1654088111747,"name":"startup_complete"}
[2022-06-01 21:55:12,034] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {verify-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2022-06-01 21:55:12,252] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 3 : {verify-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
{"timestamp":1654088112594,"name":"producer_send_success","key":null,"value":"0","offset":0,"partition":0,"topic":"verify-test"}
{"timestamp":1654088112606,"name":"producer_send_success","key":null,"value":"1","offset":1,"partition":0,"topic":"verify-test"}
{"timestamp":1654088112606,"name":"producer_send_success","key":null,"value":"2","offset":2,"partition":0,"topic":"verify-test"}
{"timestamp":1654088112608,"name":"producer_send_success","key":null,"value":"3","offset":3,"partition":0,"topic":"verify-test"}
{"timestamp":1654088112608,"name":"producer_send_success","key":null,"value":"4","offset":4,"partition":0,"topic":"verify-test"}
{"timestamp":1654088112608,"name":"producer_send_success","key":null,"value":"5","offset":5,"partition":0,"topic":"verify-test"}
{"timestamp":1654088112608,"name":"producer_send_success","key":null,"value":"6","offset":6,"partition":0,"topic":"verify-test"}
{"timestamp":1654088112608,"name":"producer_send_success","key":null,"value":"7","offset":7,"partition":0,"topic":"verify-test"}
{"timestamp":1654088112608,"name":"producer_send_success","key":null,"value":"8","offset":8,"partition":0,"topic":"verify-test"}
{"timestamp":1654088112608,"name":"producer_send_success","key":null,"value":"9","offset":9,"partition":0,"topic":"verify-test"}
{"timestamp":1654088112623,"name":"shutdown_complete"}
{"timestamp":1654088112624,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":11.273957158962796}

그리고 consumer를 이용해서 전송된 데이터를 까보도록 하겠습니다.

👉 커맨드

#consumer의 입장에서 메시지를 받아오기
$ bin/kafka-verifiable-consumer.sh --bootstrap-server [Your Host IP]:9092 --topic verify-test --group-id verify-group

👉 결과

{"timestamp":1654088447969,"name":"startup_complete"}
{"timestamp":1654088449676,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1654088449979,"name":"records_consumed","count":10,"partitions":[{"topic":"verify-test","partition":0,"count":10,"minOffset":0,"maxOffset":9}]}
{"timestamp":1654088450026,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":10}],"success":true}

결과를 자세하게 보시면, verify-group라는 컨슈머 그룹이 verify-test 토픽을 읽었기 때문에 records_consumed, offsets_committed 옵션에 로그가 찍혀있는 모습을 확인할 수 있습니다.

이와 같이, 카프카는 컨슈머가 토픽의 데이터를 읽는 경우에는 offset을 commit시켜서 로그를 찍어줍니다. 이를 이용해서 어느 컨슈머가 어느 오프셋까지 읽었는지도 체크할 수 있음을 유추할 수 있습니다.


🔥 이제 간단하게 토픽을 생성하면서 놀아봅시다(?)

우선 토픽을 생성하도록 하겠습니다.

👉 커맨드

# 토픽에 partition 개수 정보, replication 개수, 
# retention time 정보, 토픽의 이름을 전달하여 생성
$ bin/kafka-topics.sh --create --bootstrap-server [Your Host IP]:9092 --partions 3 --replication-factor 1 --config retention.ms=172800000 --topic hello.kafka

Replication-factor 정보의 경우 토픽을 몇 개의 브로커에 분산 저장할지를 결정하는 옵션입니다. 저희는 카프카 클러스터에 1개의 브로커만 두고 실습을 하고있기 때문에 1로 설정하였습니다.

👉 결과

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic hello.kafka.

다음으로 토픽이 잘 생성됐는지 검사하겠습니다.

👉 커맨드

# 토픽 리스트 조회
$ bin/kafka-topics.sh --bootstrap-server [Your Host IP]:9092 --list

👉 결과

__consumer_offsets
hello.kafka
verify-test

보시게 되면 저희는 분명히 토픽을 1개만 생성했는데...토픽이 3개가 조회되는 모습을 확인할 수 있습니다.

이유를 알려드리자면, consumer_offsets의 경우 저희가 consumer를 이용해서 topic을 조회하게 되면 조회 기록을 commit을 한다고 알고있습니다. 그런데 이 정보를 consumer_offsets라는 토픽에 저장한다고 알고계시면 되겠습니다.

그리고 verify-test 토픽의 경우 네트워크를 테스트하는 과정에서 사용한 토픽입니다. 저희는 토픽을 의식적으로 생성하지는 않았으나 자동적으로 생성됐다고 유추할 수 있습니다.

다음으로 토픽 옵션을 수정해보겠습니다.

토픽의 옵션에는 정적인 옵션, 동적인 옵션으로 구분이 되는데, 정적인 옵션 (대표적으로 파티션의 개수)를 수정할 때는 kafka-topics.sh를 이용해서 alter 옵션을 통해서 수정이 가능하지만, 동적인 옵션 (대표적으로 retention.ms)의 경우에는 kafka-configs.sh 를 이용해서 수정해야합니다.

그러면 저희는 이번에 hello.kafka 토픽의 파티션을 3개에서 4개로, 그리고 retention.ms를 절반으로 깎아보겠습니다.

👉 커맨드

# 토픽의 partition 개수 수정
$ bin/kafka-topics.sh --bootstrap-server [Your Host IP]:9092 --topic hello.kafka --alter --partitions 4

# 토픽의 정보 조회
$ bin/kafka-topics.sh --bootstrap-server [Your Host IP]:9092 --describe --topic hello.kafka

👉 결과

Topic: hello.kafka	PartitionCount: 4	ReplicationFactor: 1	Configs: segment.bytes=1073741824,retention.ms=172800000
	Topic: hello.kafka	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka	Partition: 3	Leader: 0	Replicas: 0	Isr: 0

다음으로 retention.ms를 수정해보도록 하겠습니다.

👉 커맨드

# retention.ms 수정
$ bin/kafka-configs.sh --bootstrap-server [Your Host IP]:9092 --entity-type topics --entity-name hello.kafka --alter --add-config retention.ms=86400000

# 동적 옵션 조회
$ bin/kafka-configs.sh --bootstrap-server [Your Host IP]:9092 --entity-type topics --entity-name hello.kafka --describe

👉 결과

Dynamic configs for topic hello.kafka are:
  retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000}

잘 수정된 것을 확인할 수 있습니다.

다음으로 토픽에 데이터를 넣어보겠습니다. 이 때는 producer를 이용할 것이며, 첫번째는 메시지 키를 부여하고 않고 데이터를 넣고, 두번째는 메시지 키를 부여한 상태로 데이터를 넣어보도록 하겠습니다.

1️⃣ 메시지 키를 부여하지 않은 상태로 데이터 넣기

👉 커맨드

# 데이터 넣기
$ bin/kafka-console-producer.sh --bootstrap-server [Your Host IP]:9092 --topic hello.kafka

> hello
> kafka

# 토픽 조회

$ bin.kafka-console-consumer.sh --bootstrap-server [Your Host IP]:9092 --topic hello,kafka --from-beginning

👉 조회 결과

kafka
hello

그런데 뭔가 이상합니다. 저희는 분명히 hello kafka 순서로 데이터를 넣었지만, 막상 조회를 해보면 kafka, hello 순서로 출력이 되고있습니다.

이유는 다음과 같습니다.

카프카는 토픽에 데이터를 넣게되면 메시지 키가 존재하지 않는 경우 파티션에 round-robin 방식으로 들어가게됩니다. 그리고 토픽에서 데이터를 가져올 때 파티션으로부터 동일한 중요도(우선순위)를 가지고 데이터를 가져오기 때문에 데이터의 순서성은 보장되지 않는다

그리고 메시지 키를 부여한 채로 데이터를 넣도록 하겠습니다.

2️⃣ 메시지 키를 부여한 상태로 데이터 넣기

👉 커맨드

# 메시지 키를 부여해서 데이터 넣기
$ bin/kafka-console-producer.sh --bootstrap-server [Your Host IP]:9092 --topic hello.kafka --property "parse.key=true" --property "key.separator=:"

# 메시지 키와 같이 토픽의 데이터를 조회
$ bin/kafka-console-consumer.sh --bootstrap-server [Your Host IP]:9092 --topic hello.kafka --property "print.key=true" --property "key.separator=:" --from-beginning

👉 조회 결과

key2:value2
null:kafka
key3:value3
null:hello
key1:value1

카프카 토픽에 데이터를 넣을 때 메시지 키를 부여하는 경우, 키가 특정 해쉬값으로 파싱되어 파티션에 꽂히는 형식을 가집니다. 따라서 메시지 키를 똑같게 해서 데이터를 전송하면 같은 파티션으로 보내진다는 일관성이 보장됩니다.
(파티션이 추가되지 않는 전제 하에)

카프카 토픽에 파티션이 추가가 되는 경우 메시지 키와 파티션의 일대일 대응 관계가 보장되지 않습니다. 따라서 파티션을 추가하고 싶다면 커스텀 파티셔너를 만들어서 운영해야합니다.


🌲 글을 마치며

다음 포스트에서는 카프카에 대해서 더 깊게 알아보고 실습도 해보는 내용을 다뤄보도록 하겠습니다.

긴 글 읽어주셔서 감사합니다!

profile
Hi There 🤗! I'm college student majoring Mathematics, and double majoring CSE. I'm just enjoying studying about good architectures of back-end system(applications) and how to operate the servers efficiently! 🔥

1개의 댓글

comment-user-thumbnail
2023년 10월 27일

로컬 카프카에서도 주키퍼, 카프카 서버를 실행시켜야하나요?

답글 달기