
공식 다운로드 사이트
해당 사이트로 들어가셔서 다운로드 및 압축을 풀어주시면 됩니다.
카프카는 플랫폼 별로 구분되지 않고, 내부적으로 각 OS 마다 다른 명령어를 수행할 수 있도록 구성돼있습니다.
압축을 해제하면 다음과 같은 디렉토리로 구성돼있습니다.




🔖 Kafka 관련 plug-in
- https://cwiki.apache.org/confluence/display/KAFKA/Clients
- Kafka와 데이터를 주고받기 위해 사용하는 Java Library
- Producer, Consumer, Admin, Stream 등 kafka 관련 API를 제공합니다.
- 다양한 3rd party library 존재 : C/C++, Node.js, Python, .NET 등

시나리오 테스트 전 Kafka 서버를 기동해보도록 하겠습니다.
Kafka를 관리하기 위한 코디네이터인 Zookeeper를 먼저 기동해줘야 합니다.
poweshell에 다음과 같은 명령어를 입력해줍니다.
$.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
아래와 같이 2181 port로 zookeeper가 정상 실행됩니다.
[2024-04-08 15:58:23,252] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2024-04-08 15:58:23,262] INFO Using org.apache.zookeeper.server.watch.WatchManager as watch manager (org.apache.zookeeper.server.watch.WatchManagerFactory)
[2024-04-08 15:58:23,262] INFO Using org.apache.zookeeper.server.watch.WatchManager as watch manager (org.apache.zookeeper.server.watch.WatchManagerFactory)
[2024-04-08 15:58:23,263] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2024-04-08 15:58:23,263] INFO zookeeper.commitLogCount=500 (org.apache.zookeeper.server.ZKDatabase)
[2024-04-08 15:58:23,266] INFO zookeeper.snapshot.compression.method = CHECKED (org.apache.zookeeper.server.persistence.SnapStream)
[2024-04-08 15:58:23,277] INFO Reading snapshot \tmp\zookeeper\version-2\snapshot.5b (org.apache.zookeeper.server.persistence.FileSnap)
[2024-04-08 15:58:23,280] INFO The digest in the snapshot has digest version of 2, with zxid as 0x5b, and digest value as 50122508712 (org.apache.zookeeper.server.DataTree)
[2024-04-08 15:58:23,301] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
[2024-04-08 15:58:23,301] INFO 130 txns loaded in 12 ms (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2024-04-08 15:58:23,302] INFO Snapshot loaded in 38 ms, highest zxid is 0xdd, digest is 268578072849 (org.apache.zookeeper.server.ZKDatabase)
[2024-04-08 15:58:23,302] INFO Snapshotting: 0xdd to \tmp\zookeeper\version-2\snapshot.dd (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2024-04-08 15:58:23,304] INFO Snapshot taken in 2 ms (org.apache.zookeeper.server.ZooKeeperServer)
[2024-04-08 15:58:23,314] INFO zookeeper.request_throttler.shutdownTimeout = 10000 ms (org.apache.zookeeper.server.RequestThrottler)
[2024-04-08 15:58:23,314] INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
[2024-04-08 15:58:23,326] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)
$.\bin\windows\kafka-server-start.bat .\config\server.properties
다음으로 Kafka를 기동해줍니다.
[2024-04-08 16:12:16,030] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2024-04-08 16:12:16,034] INFO [Controller id=0, targetBrokerId=0] Client requested connection close from node 0 (org.apache.kafka.clients.NetworkClient)
[2024-04-08 16:12:16,079] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2024-04-08 16:12:16,093] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Enabling request processing. (kafka.network.SocketServer)
[2024-04-08 16:12:16,094] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.DataPlaneAcceptor)
[2024-04-08 16:12:16,098] INFO Kafka version: 3.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2024-04-08 16:12:16,098] INFO Kafka commitId: 2ae524ed625438c5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-04-08 16:12:16,098] INFO Kafka startTimeMs: 1712560336095 (org.apache.kafka.common.utils.AppInfoParser)
[2024-04-08 16:12:16,100] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2024-04-08 16:12:16,175] INFO [zk-broker-0-to-controller-forwarding-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 0 rack: null) (kafka.server.NodeToControllerRequestThread)
⚠️ Trouble-Shooting
카프카 서버 기동 중 다음과 같은 오류 메시지가 발생했습니다.
[2024-04-05 15:18:24,596] ERROR Unable to start acceptor for ListenerName(PLAINTEXT) (kafka.network.DataPlaneAcceptor) org.apache.kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:9092: Address already in use: bind. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:734) at kafka.network.Acceptor.liftedTree1$1(SocketServer.scala:637) at kafka.network.Acceptor.start(SocketServer.scala:632) at kafka.network.SocketServer.$anonfun$enableRequestProcessing$2(SocketServer.scala:222) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325) at kafka.network.SocketServer.chainAcceptorFuture$1(SocketServer.scala:215) at kafka.network.SocketServer.$anonfun$enableRequestProcessing$5(SocketServer.scala:229) at java.base/java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780) at kafka.network.SocketServer.enableRequestProcessing(SocketServer.scala:229) at kafka.server.KafkaServer.startup(KafkaServer.scala:621) at kafka.Kafka$.main(Kafka.scala:112) at kafka.Kafka.main(Kafka.scala) Caused by: java.net.BindException: Address already in use: bind at java.base/sun.nio.ch.Net.bind0(Native Method) at java.base/sun.nio.ch.Net.bind(Net.java:555) at java.base/sun.nio.ch.ServerSocketChannelImpl.netBind(ServerSocketChannelImpl.java:337) at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:294) at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:89) at kafka.network.Acceptor.openServerSocket(SocketServer.scala:730) ... 13 more내용을 확인해보니, 9092 port가 이미 사용되고 있다는 내용의 오류 였습니다.
이러한 경우, 다음과 같이 kafka 실행 설정에 들어가서 port 번호를 수정할 수 있습니다.# server.properties 수정 # Broker가 사용하는 호스트와 포트 지정, 형식은 PLAINTEXT://your.host.name:port를 사용합니다. listeners=PLAINTEXT://:9093 # Producer와 Consumer가 접근할 호스트와 포트를 지정, 기본값은 listeners를 사용합니다. advertised.listeners=PLAINTEXT://localhost:9093하지만 필자는 taskKill을 사용하여 9092를 죽이고, 다시 실행하도록 하겠습니다.
일반적으로 Producer에서 메시지 전송 시 Topic에 저장되는데, 이러한 Topic은 생성해줘야 합니다.
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic quickstart-events --partitions 1

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list

정상적으로 생성된 'test-events' 라는 네임밍의 Topic이 list에 조회됩니다.
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092--describe --topic quickstart-events

메시지 생성
$ .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test-events

메시지 소비
$KAFKA_HOME/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning

모든 이벤트관련 내용들은 Kafka 서버 콘솔에 로깅됩니다.
[2024-04-08 16:18:15,786] INFO [LogLoader partition=test-events-0, dir=C:\tmp\kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2024-04-08 16:18:15,787] INFO Created log for partition test-events-0 in C:\tmp\kafka-logs\test-events-0 with properties {} (kafka.log.LogManager)
[2024-04-08 16:18:15,788] INFO [Partition test-events-0 broker=0] No checkpointed highwatermark is found for partition test-events-0 (kafka.cluster.Partition)
[2024-04-08 16:18:15,788] INFO [Partition test-events-0 broker=0] Log loaded for partition test-events-0 with initial high watermark 0 (kafka.cluster.Partition)
[2024-04-08 16:22:15,931] INFO [GroupMetadataManager brokerId=0] Group console-consumer-30618 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
[2024-04-08 16:28:10,550] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group console-consumer-33537 in Empty state. Created a new member id console-consumer-33410579-728d-4093-8fae-57f7183085ac and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2024-04-08 16:28:10,554] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-33537 in state PreparingRebalance with old generation 0 (__consumer_offsets-32) (reason: Adding new member console-consumer-33410579-728d-4093-8fae-57f7183085ac with group instance id None; client reason: need to re-join with the given member-id: console-consumer-33410579-728d-4093-8fae-57f7183085ac) (kafka.coordinator.group.GroupCoordinator)
[2024-04-08 16:28:10,557] INFO [GroupCoordinator 0]: Stabilized group console-consumer-33537 generation 1 (__consumer_offsets-32) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2024-04-08 16:28:10,569] INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-33410579-728d-4093-8fae-57f7183085ac for group console-consumer-33537 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
두번째 시나리오로 카프카 커넥트를 진행해보겠습니다.

카프카 커넥트(Kafka Connect)는 데이터를 자유롭게 export, import할 수 있는 기능을 말합니다.
별다른 요청 없이 Configuration 파일만으로도 데이터를 다른 서비스에 전달해줄 수 있는 역할을 합니다.
Standalone 모드 또는 Distribution mode를 지원해줍니다.
커스텀 Connector를 만들어 내부적으로 다양한 형태의 플러그인과 조합하여 다양한 형태의 리소스에 보내는 것이 가능합니다.
위의 그림처럼 카프카 커넥트(Connect)는 카프카 클러스터를 먼저 구성한 뒤, 양 옆에 배치가 가능합니다.
위의 커넥트 실습을 위해서 하나의 DB에서 또다른 DB로 값을 넣는 예제를 구현할 것입니다.
이를 위해서 새로운 DBMS를 설치해줘야 하는데, 이를 Maria DB로 구현해보겠습니다.
💡 Maria DB 다운로드
다운로드 후 로컬 시스템에 Maria DB가 정상적으로 등록되었는지 확인합니다.

이번에는 Orders Microservice에서 MariaDB를 연동하도록 작업을 수행하겠습니다.
검색창에 MySQL Client를 검색하신 뒤 기동하면 다음과 같이 비밀번호 입력창이 나오게 됩니다.
비밀번호를 정상 입력하면 정상적으로 데이터베이스 접속이 됩니다.

그 다음으로, mydb 라는 데이터베이스를 생성해줍니다. create database mydb 명령어를 입력하면, database를 생성하게 됩니다.

그리고, use mydb 를 입력하여 해당 데이터베이스 사용 상태로 변경해줍니다.
implementation 'org.mariadb.jdbc:mariadb-java-client'
<기존의 H2 Console>

위 설정을 다음과 같이 변경해줍니다.

Test Connection 결과, 다음과 같이 오류가 발생했습니다.

오류를 확인해보니 해당 드라이버와 JDBC가 맞지 않다는 오류입니다.
해서, 다음과 같이 JDBC URL을 mariadb로 변경하니 정상적으로 연동 되었습니다.

정확한 원인은 추후 찾아보도록 하겠습니다.
H2 Console에 접속하여 다음과 같이 create 문을 사용하여 users 라는 테이블을 mariadb에 생성해줍니다.
create table users(
id int auto_increment primary key,
user_id varchar(20),
pw varchar(20),
name varchar(20),
created_at datetime default NOW()
);
다음과 같이 MySQL Client에서 table이 생성된 것을 확인할 수 있습니다.

POST 요청
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mariadb://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"root",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"mydb.users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
GET 요청 - 전체 조회


GET 요청 - 상세 조회


DELETE 요청 - 커넥터 삭제

POST 요청

{
"name" : "my-sink-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mariadb://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"root",
"auto.create" : "true",
"auto.evolve" : "true",
"delete.enabled" : "false",
"tasks.max" : "1",
"topics" : "my_topic_users"
}
}


즉, DB 내부에 users에 값을 변경하였지만, 실제로 my_topic_users에도 변경 데이터값이 topic을 통해서 들어오게 되는 것입니다.

실제 Topic에 쌓인 데이터


Connector에서 사용하는 양식의 데이터를 Producer 통해서 Topic으로 전달합니다.

Consumer에서 해당 Topic을 전달받았습니다.
그 다음 users 라는 db 테이블을 확인해보면, 데이터가 없습니다. -> users는 source connect 되어 있기 때문에, topic에서 데이터를 받지 않습니다.

마지막으로, my_topic_users를 확인해보면, 데이터가 추가된 것을 확인할 수 있습니다.
