[MSA] 데이터 동기화를 위한 Apache Kafka의 활용(1)

Kim Hyen Su·2024년 4월 8일

MSA

목록 보기
14/18
post-thumbnail

1. Apache Kafka 설치

Apache Kafka HomePage

공식 다운로드 사이트

해당 사이트로 들어가셔서 다운로드 및 압축을 풀어주시면 됩니다.

카프카는 플랫폼 별로 구분되지 않고, 내부적으로 각 OS 마다 다른 명령어를 수행할 수 있도록 구성돼있습니다.

Direcotry Tree

압축을 해제하면 다음과 같은 디렉토리로 구성돼있습니다.

  • bin(ary) : 각 종 실행 명령어가 담긴 폴더.
  • config : 설정이 들어간 폴더.

config

  • connect : 다른 Target App.과 연결관련 설정
  • zookeeper : 주키퍼 관련 연동 설정

bin

  • Mac과 같은 Shellscript 명령어

  • 윈도우즈 명령어

2. Apache Kafka 사용

🔖 Kafka 관련 plug-in

📜 EcoSystem - Kafka Client

  • kafka에 데이터를 등록하고 Consumer에서 해당 데이터를 전송 받는 형태의 시나리오를 말합니다.

시나리오 테스트 전 Kafka 서버를 기동해보도록 하겠습니다.

Kafka를 관리하기 위한 코디네이터인 Zookeeper를 먼저 기동해줘야 합니다.

zookeeper 기동

poweshell에 다음과 같은 명령어를 입력해줍니다.

$.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

아래와 같이 2181 port로 zookeeper가 정상 실행됩니다.

[2024-04-08 15:58:23,252] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2024-04-08 15:58:23,262] INFO Using org.apache.zookeeper.server.watch.WatchManager as watch manager (org.apache.zookeeper.server.watch.WatchManagerFactory)
[2024-04-08 15:58:23,262] INFO Using org.apache.zookeeper.server.watch.WatchManager as watch manager (org.apache.zookeeper.server.watch.WatchManagerFactory)
[2024-04-08 15:58:23,263] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2024-04-08 15:58:23,263] INFO zookeeper.commitLogCount=500 (org.apache.zookeeper.server.ZKDatabase)
[2024-04-08 15:58:23,266] INFO zookeeper.snapshot.compression.method = CHECKED (org.apache.zookeeper.server.persistence.SnapStream)
[2024-04-08 15:58:23,277] INFO Reading snapshot \tmp\zookeeper\version-2\snapshot.5b (org.apache.zookeeper.server.persistence.FileSnap)
[2024-04-08 15:58:23,280] INFO The digest in the snapshot has digest version of 2, with zxid as 0x5b, and digest value as 50122508712 (org.apache.zookeeper.server.DataTree)
[2024-04-08 15:58:23,301] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
[2024-04-08 15:58:23,301] INFO 130 txns loaded in 12 ms (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2024-04-08 15:58:23,302] INFO Snapshot loaded in 38 ms, highest zxid is 0xdd, digest is 268578072849 (org.apache.zookeeper.server.ZKDatabase)
[2024-04-08 15:58:23,302] INFO Snapshotting: 0xdd to \tmp\zookeeper\version-2\snapshot.dd (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2024-04-08 15:58:23,304] INFO Snapshot taken in 2 ms (org.apache.zookeeper.server.ZooKeeperServer)
[2024-04-08 15:58:23,314] INFO zookeeper.request_throttler.shutdownTimeout = 10000 ms (org.apache.zookeeper.server.RequestThrottler)
[2024-04-08 15:58:23,314] INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
[2024-04-08 15:58:23,326] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)

kafka 기동

$.\bin\windows\kafka-server-start.bat .\config\server.properties

다음으로 Kafka를 기동해줍니다.

[2024-04-08 16:12:16,030] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2024-04-08 16:12:16,034] INFO [Controller id=0, targetBrokerId=0] Client requested connection close from node 0 (org.apache.kafka.clients.NetworkClient)
[2024-04-08 16:12:16,079] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2024-04-08 16:12:16,093] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Enabling request processing. (kafka.network.SocketServer)
[2024-04-08 16:12:16,094] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.DataPlaneAcceptor)
[2024-04-08 16:12:16,098] INFO Kafka version: 3.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2024-04-08 16:12:16,098] INFO Kafka commitId: 2ae524ed625438c5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-04-08 16:12:16,098] INFO Kafka startTimeMs: 1712560336095 (org.apache.kafka.common.utils.AppInfoParser)
[2024-04-08 16:12:16,100] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2024-04-08 16:12:16,175] INFO [zk-broker-0-to-controller-forwarding-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 0 rack: null) (kafka.server.NodeToControllerRequestThread)

⚠️ Trouble-Shooting

카프카 서버 기동 중 다음과 같은 오류 메시지가 발생했습니다.

[2024-04-05 15:18:24,596] ERROR Unable to start acceptor for ListenerName(PLAINTEXT) (kafka.network.DataPlaneAcceptor)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:9092: Address already in use: bind.
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:734)
        at kafka.network.Acceptor.liftedTree1$1(SocketServer.scala:637)
        at kafka.network.Acceptor.start(SocketServer.scala:632)
        at kafka.network.SocketServer.$anonfun$enableRequestProcessing$2(SocketServer.scala:222)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887)
        at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325)
        at kafka.network.SocketServer.chainAcceptorFuture$1(SocketServer.scala:215)
        at kafka.network.SocketServer.$anonfun$enableRequestProcessing$5(SocketServer.scala:229)
        at java.base/java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780)
        at kafka.network.SocketServer.enableRequestProcessing(SocketServer.scala:229)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:621)
        at kafka.Kafka$.main(Kafka.scala:112)
        at kafka.Kafka.main(Kafka.scala)
Caused by: java.net.BindException: Address already in use: bind
        at java.base/sun.nio.ch.Net.bind0(Native Method)
        at java.base/sun.nio.ch.Net.bind(Net.java:555)
        at java.base/sun.nio.ch.ServerSocketChannelImpl.netBind(ServerSocketChannelImpl.java:337)
        at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:294)
        at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:89)
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:730)
        ... 13 more

내용을 확인해보니, 9092 port가 이미 사용되고 있다는 내용의 오류 였습니다.

이러한 경우, 다음과 같이 kafka 실행 설정에 들어가서 port 번호를 수정할 수 있습니다.

# server.properties 수정
# Broker가 사용하는 호스트와 포트 지정, 형식은  PLAINTEXT://your.host.name:port를 사용합니다.
listeners=PLAINTEXT://:9093
# Producer와 Consumer가 접근할 호스트와 포트를 지정, 기본값은 listeners를 사용합니다.
advertised.listeners=PLAINTEXT://localhost:9093

하지만 필자는 taskKill을 사용하여 9092를 죽이고, 다시 실행하도록 하겠습니다.

Topic 생성

일반적으로 Producer에서 메시지 전송 시 Topic에 저장되는데, 이러한 Topic은 생성해줘야 합니다.

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic quickstart-events --partitions 1

  • quickstart-events : Topic-name 설정
  • bootstrap-server : 단일서버
  • localhosr:9092 : kafka 포트
  • partitions 1 : partition 1개 생성

Topic 목록 확인

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list

정상적으로 생성된 'test-events' 라는 네임밍의 Topic이 list에 조회됩니다.

Topic 상세 정보 확인

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092--describe --topic quickstart-events

  • Partition : 파티션 갯수
  • Leader : 리더 파티션 갯수
  • Replicas : 팔로우 파티션 갯수
  • Isr : Isr 갯수

Kafka Producer / Consumer 테스트

  • 메시지 생성

    $ .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test-events

  • 메시지 소비
    $KAFKA_HOME/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning

모든 이벤트관련 내용들은 Kafka 서버 콘솔에 로깅됩니다.

[2024-04-08 16:18:15,786] INFO [LogLoader partition=test-events-0, dir=C:\tmp\kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2024-04-08 16:18:15,787] INFO Created log for partition test-events-0 in C:\tmp\kafka-logs\test-events-0 with properties {} (kafka.log.LogManager)
[2024-04-08 16:18:15,788] INFO [Partition test-events-0 broker=0] No checkpointed highwatermark is found for partition test-events-0 (kafka.cluster.Partition)
[2024-04-08 16:18:15,788] INFO [Partition test-events-0 broker=0] Log loaded for partition test-events-0 with initial high watermark 0 (kafka.cluster.Partition)
[2024-04-08 16:22:15,931] INFO [GroupMetadataManager brokerId=0] Group console-consumer-30618 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
[2024-04-08 16:28:10,550] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group console-consumer-33537 in Empty state. Created a new member id console-consumer-33410579-728d-4093-8fae-57f7183085ac and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2024-04-08 16:28:10,554] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-33537 in state PreparingRebalance with old generation 0 (__consumer_offsets-32) (reason: Adding new member console-consumer-33410579-728d-4093-8fae-57f7183085ac with group instance id None; client reason: need to re-join with the given member-id: console-consumer-33410579-728d-4093-8fae-57f7183085ac) (kafka.coordinator.group.GroupCoordinator)
[2024-04-08 16:28:10,557] INFO [GroupCoordinator 0]: Stabilized group console-consumer-33537 generation 1 (__consumer_offsets-32) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2024-04-08 16:28:10,569] INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-33410579-728d-4093-8fae-57f7183085ac for group console-consumer-33537 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

📜 EcoSystem - Kafka Connect

두번째 시나리오로 카프카 커넥트를 진행해보겠습니다.

카프카 커넥트(Kafka Connect)는 데이터를 자유롭게 export, import할 수 있는 기능을 말합니다.

별다른 요청 없이 Configuration 파일만으로도 데이터를 다른 서비스에 전달해줄 수 있는 역할을 합니다.

Standalone 모드 또는 Distribution mode를 지원해줍니다.

  • RESTful API 를 지원하기 때문에, postman과 같이 API 테스트 툴을 사용하여 커넥터를 생성/삭제 가 가능합니다.
  • Stream 또는 Batch 형태로 데이터 전송이 가능합니다.

커스텀 Connector를 만들어 내부적으로 다양한 형태의 플러그인과 조합하여 다양한 형태의 리소스에 보내는 것이 가능합니다.

  • File -> DB, DB -> File, 스토리지1 -> 스토리지2

위의 그림처럼 카프카 커넥트(Connect)는 카프카 클러스터를 먼저 구성한 뒤, 양 옆에 배치가 가능합니다.

  • 카프카 클러스터에 import 해오는 커넥트를 Source 커넥트, export 하는 커넥트를 Sink 커넥트라고 합니다.

위의 커넥트 실습을 위해서 하나의 DB에서 또다른 DB로 값을 넣는 예제를 구현할 것입니다.

이를 위해서 새로운 DBMS를 설치해줘야 하는데, 이를 Maria DB로 구현해보겠습니다.

💡 Maria DB 다운로드

다운로드 후 로컬 시스템에 Maria DB가 정상적으로 등록되었는지 확인합니다.

Orders Microservice에 MariaDB 연동하기

이번에는 Orders Microservice에서 MariaDB를 연동하도록 작업을 수행하겠습니다.

MySQL Client 사용 - windows

검색창에 MySQL Client를 검색하신 뒤 기동하면 다음과 같이 비밀번호 입력창이 나오게 됩니다.

비밀번호를 정상 입력하면 정상적으로 데이터베이스 접속이 됩니다.

그 다음으로, mydb 라는 데이터베이스를 생성해줍니다. create database mydb 명령어를 입력하면, database를 생성하게 됩니다.

그리고, use mydb 를 입력하여 해당 데이터베이스 사용 상태로 변경해줍니다.

Order-Service Dependencies 추가

implementation 'org.mariadb.jdbc:mariadb-java-client'

H2 Console 설정 변경

<기존의 H2 Console>

위 설정을 다음과 같이 변경해줍니다.

  • Saved Settings - Generic MySQL
  • Driver Class : org.mariadb.jdbc.Driver
  • JDBC URL : jdc:mysql://localhost:3306/mydb
  • Username : root
  • Password : 비밀번호

Test Connection 결과, 다음과 같이 오류가 발생했습니다.

오류를 확인해보니 해당 드라이버와 JDBC가 맞지 않다는 오류입니다.

해서, 다음과 같이 JDBC URL을 mariadb로 변경하니 정상적으로 연동 되었습니다.

정확한 원인은 추후 찾아보도록 하겠습니다.

H2 Console에 접속하여 다음과 같이 create 문을 사용하여 users 라는 테이블을 mariadb에 생성해줍니다.

create table users(
 id int auto_increment primary key,
 user_id varchar(20),
 pw varchar(20),
 name varchar(20),
 created_at datetime default NOW()
);

다음과 같이 MySQL Client에서 table이 생성된 것을 확인할 수 있습니다.

Kafka Connect 설치

Source Connect

POST 요청

{
    "name" : "my-source-connect",
    "config" : {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url":"jdbc:mariadb://localhost:3306/mydb",
    "connection.user":"root",
    "connection.password":"root",
    "mode": "incrementing",
    "incrementing.column.name" : "id",
    "table.whitelist":"mydb.users",
    "topic.prefix" : "my_topic_",
    "tasks.max" : "1"
    }
}

GET 요청 - 전체 조회

GET 요청 - 상세 조회

DELETE 요청 - 커넥터 삭제

Sink Connect

POST 요청

{
    "name" : "my-sink-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mariadb://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"root",
        "auto.create" : "true",
        "auto.evolve" : "true",
        "delete.enabled" : "false",
        "tasks.max" : "1",
        "topics" : "my_topic_users"
    }
}

  • 자동으로 maria db에 해당 테이블이 생성됐습니다.
  • 테이블 내부의 데이터는 기존의 users와 동일한 데이터를 가지고 있습니다.

즉, DB 내부에 users에 값을 변경하였지만, 실제로 my_topic_users에도 변경 데이터값이 topic을 통해서 들어오게 되는 것입니다.

실제 Topic에 쌓인 데이터

Producer를 통해서 Topic에 데이터 넣기.

Connector에서 사용하는 양식의 데이터를 Producer 통해서 Topic으로 전달합니다.

Consumer에서 해당 Topic을 전달받았습니다.

그 다음 users 라는 db 테이블을 확인해보면, 데이터가 없습니다. -> users는 source connect 되어 있기 때문에, topic에서 데이터를 받지 않습니다.

마지막으로, my_topic_users를 확인해보면, 데이터가 추가된 것을 확인할 수 있습니다.

profile
백엔드 서버 엔지니어

0개의 댓글