kafka connect로 MongoDB Change Event 감지해보기 #1

ssongkim·2023년 3월 19일
0

Overview

mongodb는 자체적으로도 kafka connector을 지원하기도 하고 데비지움에서도 connector를 제공한다.

데비지움도 믿을만하지만 몽고디비 자체로 제공해주는 source connector가 더 믿음이 가지 않을까?

우리는 몽고디비에서 공식적으로 제공하는 source connector를 사용하며 튜토리얼을 따라해본다. 전반적인 내용은 공식 문서를 참고하였으나 오타가 있거나 docker-compose 구성이 잘못되어 있는 등 공식문서대로 진행하면 제대로 동작하지 않아 중간에 애를 많이 먹었다. 해결 과정을 기술한다.

우리는 docker desktop 대신 colima를 사용한다. colima 개발환경을 사전에 먼저 구성해주어야 한다.

참고문서

다음 공식 문서를 참고하였다.

몽고디비 제공 source connector
https://www.mongodb.com/docs/kafka-connector/current/tutorials/source-connector/

시작하기

몽고디비에서 제공해주는 connector를 사용하는 튜토리얼 문서를 참고하여 진행하며 튜토리얼 진행 중에 발생했던 문제와 그 해결 과정을 기술한다.

colima start

colima start --network-address --cpu 4 --memory 8 --edit
 
// provision 설정
provision:
  - mode: system
    script: |
        sysctl -w vm.max_map_count=1000000
        echo never >/sys/kernel/mm/transparent_hugepage/enabled

git clone

git clone <https://github.com/mongodb-university/kafka-edu.git>

docker-compose 분석

git: https://github.com/mongodb-university/kafka-edu/blob/main/docs-examples/mongodb-kafka-base/docker-compose.yml

  • zookeeper
    주키퍼 서버이다.
  • broker
    카프카 브로커이다.
  • connect
    카프카 커넥트를 도커화한 것이다.
    connect.Dockerfile에서 이미지를 빌드하여 사용한다.
    confluent-hub 명령어를 이용해 도커파일 내부에서 몽고디비 전용파일을 다운받아 이미지를 구성함을 확인
    REST_PORT를 볼 수 있는데 카프카 커넥트는 rest api를 이용해 편리함 제공한다. (커넥터 조회, 커넥터 플러그인 리스트 조회 등)
  • `rest-proxy
    카프카 클러스터를 위한 RESTful interface application을 오픈소스로 제공하고 있습니다. 직접 코드를 짜지 않고 범용적으로 사용되는 http을 사용해서 데이터를 넣고 뺄 수 있다는 점이 독특
  • schema-registry
    카프카 클러스터 외부에서 스키마만을 관리하는 기능을 지닌 서비스
    Avro®, JSON 스키마 및 Protobuf 스키마를 저장하고 검색하기 위한 RESTful 인터페이스를 제공
    Producer가 메시지를 기존에 보내던 것과 다른 스키마 형식으로 보낸다면 Consumer는 바뀐 메시지를 받았을 때 문제가 크게 발생할 수도 있다. 이런 일을 방지하기 위해서 스키마 레지스트리를 사용할 수 있다.
  • mongo1
    몽고디비이다.
    mongo.Dockerfile에서 이미지를 빌드해서 사용한다.
ADD change_streams /tutorials/change_streams
ADD source_connector /tutorials/source_connector
ADD sink_connector /tutorials/sink_connector
ADD cdc_handler /tutorials/cdc_handler
ADD time_series /tutorials/time_series

위를 보면 다양한 정보를 호스트에서 몽고컨테이너로 {db}/~~ 형태로 add 해줌을 확인 가능하다.
source connector, sink connector 연결 시 필요한 json 파일 예제를 담고있다.
이외에 불필요한 튜토리얼 관련 명령어들이 있는데 실제 사용할 땐 싹 다 제거해도 될듯하다.

  • mongo1-setup
    mongodb에 처음 레플리카 세팅작업을 해주는 명령을 실행해주고 죽는 컨테이너로 추정된다.

docker-compose up

cd kafka-edu/docs-examples/mongodb-kafka-base
 
docker-compose -p mongo-kafka up -d

튜토리얼대로 하다가 처음에 제대로 띄워지지 않는 현상 발생

  • failed to solve: process "/bin/sh -c apt-get -y upgrade" did not complete successfully: exit code: 100 에러가 떠서 해당 스크립트를 mongo.Dockerfile에서 제거하여 해결하였다.
  • mongo1-setup 컨테이너 warning 발생, 로그를 떠보니 vm_max_map_count is too row 이랑 always를 never로 바꾸길 권장한다는 로그가 뜸, 그래서 콜리마 시작 시 colima start --network-address --edit
provision:
  - mode: system
    script: |
        sysctl -w vm.max_map_count=1000000
        echo never >/sys/kernel/mm/transparent_hugepage/enabled
  • 카프카 브로커 host를 잡지 못해 스키마레지스트리, 커넥트 컨테이너 실행 실패먼저 모든 CONNECT_BOOTSTRAP_SERVER: "broker:29092" → CONNECT_BOOTSTRAP_SERVER: "PLAINTEXT://broker:29092로 변경하여 host를 잡아주었다.
  • 스키마 레지스트리SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL 부분 삭제
  • connect컨테이너 종료 flag를 보니 exited 137 발생, 이는 OOM을 의미한다. 콜리마 시작 시 --cpu 4 --memory 8 추가

kafka connect status 확인

몽고디비에서 커넥터 상태를 직접확인하는 것이 아니라 status 유틸을 이용하여 curl로 kafka connect에 요청을 날려 상태를 확인하는 것이니 오해말자.

docker exec mongo1 status
 
Kafka topics:
 
    "topic": "docker-connect-status",
    "topic": "_schemas",
    "topic": "__consumer_offsets",
    "topic": "docker-connect-offsets",
    "topic": "docker-connect-configs",
 
The status of the connectors:
 
Currently configured connectors
 
[]
 
Version of MongoDB Connector for Apache Kafka installed:

카프카 커넥트 관련 토픽이 생성된 것 확인 가능하다

하지만 connectors 배열이 비어있다, 왜냐하면, 커넥트 구성은 하였는데 해당 커넥트에 연결된 커넥터가 없기 때문이다.

source connector 붙이기

컨테이너 접속

docker exec -it mongo1 /bin/bash

명령어 실행

해당 simple_source.json은 mongo.Dockerfile 부분에서 임의로 넣었음을 확인할 수 있다. 해당 파일을 확인한다.
curl을 통해 source connector를 연결하는 connect에서 제공하는 rest API를 call하는 모습이다.

curl -X POST -H "Content-Type: application/json" \\
-d @/tutorials/source_connector/simple_source.json <http://connect:8083/connectors> -w "\\n"

에러발생

{
  "error_code": 500,
  "message": "Failed to find any class that implements Connector and which name matches com.mongodb.kafka.connect.MongoSourceConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}"
}

왜냐?

curl <http://connect:8083/connector-plugins>

이거로 확인해보니 해당 커넥터 플러그인이 없었음

confluentinc/cp-kafka-connect-base 라는 이름을 가진 이미지가 도커 허브에 있어서 내부의 Dockerfile 빌드를 진행하지 않고 도커허브의 이미지를 사용하여 dockerfile에는 mongo plugin을 설치하였지만 반영이 되지 않았던 것,

docker-compose에서 connect.Dockerfile을 사용하는 해당 서비스의 이미지를 임의의 이름으로 변경하여 로컬빌드를 사용하도록 수정(dcollect-cp-kafka-connect-base:1.0)

source connector 확인

몽고디비에서 커넥터 상태를 직접확인하는 것이 아니라 status 유틸을 이용하여 curl로 kafka connect에 요청을 날려 상태를 확인하는 것이니 오해말자.

status

Kafka topics:
 
    "topic": "docker-connect-status",
    "topic": "_schemas",
    "topic": "__consumer_offsets",
    "topic": "docker-connect-offsets",
    "topic": "docker-connect-configs",
 
The status of the connectors:
 
source  |  mongo-simple-source  |  RUNNING  |  RUNNING  |  com.mongodb.kafka.connect.MongoSourceConnector
 
Currently configured connectors
 
[
  "mongo-simple-source"
]
 
Version of MongoDB Connector for Apache Kafka installed:
 
{"class":"com.mongodb.kafka.connect.MongoSinkConnector","type":"sink","version":"1.8.0"}
{"class":"com.mongodb.kafka.connect.MongoSourceConnector","type":"source","version":"1.8.0"}

connector 연결이 된 것을 확인할 수 있다.

MongoDB Change Event 감지 확인해보기

mongodb 접속

컨테이너 내부에서 해당 명령어를 실행해 몽고디비와 연결한다.

mongosh "mongodb://mongo1"

document 생성

해당 명령어를 통해 document를 생성해본다. (대소문자 주의!)

use Tutorial1

insert 2번, update 1번, delete 1번에 해당하는 쿼리를 날린다.

db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )

// 컬렉션 내용 확인하여 프라이머리 키 확인
 
db.orders.find()
 
// update
db.orders.updateOne({
_id: ObjectId("641172ed232c26415652396b")
}, {
$set: {
item: "asdasdsd"
}
})
 
// delete
db.orders.deleteOne({'item': 'asdasdsd'})

해당 컬렉션 변화 이벤트 메시지가 담기는 토픽 생성 확인

status

status를 쳐보면 "topic": "Tutorial1.orders", 이 생겨난 것을 확인할 수 있다.

토픽 들여다보기

다음 유틸 명령어를 통해 해당 토픽 안의 메시지를 확인해볼 수 있다.

kc Tutorial1.orders
MongoDB Kafka Sandbox $kc Tutorial1.orders
% Auto-selecting Consumer mode (use -P or -C to override)
 
Partition: 0    Offset: 0
 
Key (198 bytes):
 
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641172ED000000022B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}}"}
 
Value (572 bytes):
 
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641172ED000000022B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}, \\"operationType\\": \\"insert\\", \\"clusterTime\\": {\\"$timestamp\\": {\\"t\\": 1678865133, \\"i\\": 2}}, \\"wallTime\\": {\\"$date\\": 1678865133025}, \\"fullDocument\\": {\\"_id\\": {\\"$oid\\": \\"641172ed232c26415652396b\\"}, \\"order_id\\": 1, \\"item\\": \\"coffee\\"}, \\"ns\\": {\\"db\\": \\"Tutorial1\\", \\"coll\\": \\"orders\\"}, \\"documentKey\\": {\\"_id\\": {\\"$oid\\": \\"641172ed232c26415652396b\\"}}}"}
-------------------------
 
Partition: 0    Offset: 1
 
Key (198 bytes):
 
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641173FD000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641173FDF1C24928892855D80004\\"}}"}
 
Value (578 bytes):
 
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641173FD000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641173FDF1C24928892855D80004\\"}, \\"operationType\\": \\"insert\\", \\"clusterTime\\": {\\"$timestamp\\": {\\"t\\": 1678865405, \\"i\\": 1}}, \\"wallTime\\": {\\"$date\\": 1678865405880}, \\"fullDocument\\": {\\"_id\\": {\\"$oid\\": \\"641173fdf1c24928892855d8\\"}, \\"order_id\\": 1, \\"item\\": \\"coffeeChange\\"}, \\"ns\\": {\\"db\\": \\"Tutorial1\\", \\"coll\\": \\"orders\\"}, \\"documentKey\\": {\\"_id\\": {\\"$oid\\": \\"641173fdf1c24928892855d8\\"}}}"}
-------------------------
 
Partition: 0    Offset: 2
 
Key (198 bytes):
 
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"8264117566000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}}"}
 
Value (580 bytes):
 
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"8264117566000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}, \\"operationType\\": \\"update\\", \\"clusterTime\\": {\\"$timestamp\\": {\\"t\\": 1678865766, \\"i\\": 1}}, \\"wallTime\\": {\\"$date\\": 1678865766927}, \\"ns\\": {\\"db\\": \\"Tutorial1\\", \\"coll\\": \\"orders\\"}, \\"documentKey\\": {\\"_id\\": {\\"$oid\\": \\"641172ed232c26415652396b\\"}}, \\"updateDescription\\": {\\"updatedFields\\": {\\"item\\": \\"asdasdsd\\"}, \\"removedFields\\": [], \\"truncatedArrays\\": []}}"}
-------------------------
 
Partition: 0    Offset: 3
 
Key (198 bytes):
 
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641185E9000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}}"}
 
Value (462 bytes):
 
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641185E9000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}, \\"operationType\\": \\"delete\\", \\"clusterTime\\": {\\"$timestamp\\": {\\"t\\": 1678869993, \\"i\\": 1}}, \\"wallTime\\": {\\"$date\\": 1678869993393}, \\"ns\\": {\\"db\\": \\"Tutorial1\\", \\"coll\\": \\"orders\\"}, \\"documentKey\\": {\\"_id\\": {\\"$oid\\": \\"641172ed232c26415652396b\\"}}}"}
-------------------------
 
% Reached end of topic Tutorial1.orders [0] at offset 4

insert와 update, delete 이벤트가 감지되어 토픽에 잘 들어온 것을 확인할 수 있다.

마무리

몽고디비 공식 문서에서 제공하는 튜토리얼을 이용하여 mongodb Change Event 감지를 kafka connect로 구현해보았다.

문제를 해결하며 완성한 예제는
https://github.com/suhongkim98/kafka-edu
에 올려놓았다.

profile
鈍筆勝聰✍️

0개의 댓글