mongodb는 자체적으로도 kafka connector을 지원하기도 하고 데비지움에서도 connector를 제공한다.
데비지움도 믿을만하지만 몽고디비 자체로 제공해주는 source connector
가 더 믿음이 가지 않을까?
우리는 몽고디비에서 공식적으로 제공하는 source connector를 사용하며 튜토리얼을 따라해본다. 전반적인 내용은 공식 문서를 참고하였으나 오타가 있거나 docker-compose 구성이 잘못되어 있는 등 공식문서대로 진행하면 제대로 동작하지 않아 중간에 애를 많이 먹었다. 해결 과정을 기술한다.
우리는 docker desktop 대신 colima를 사용한다. colima 개발환경을 사전에 먼저 구성해주어야 한다.
다음 공식 문서를 참고하였다.
몽고디비 제공 source connector
https://www.mongodb.com/docs/kafka-connector/current/tutorials/source-connector/
몽고디비에서 제공해주는 connector를 사용하는 튜토리얼 문서를 참고하여 진행하며 튜토리얼 진행 중에 발생했던 문제와 그 해결 과정을 기술한다.
colima start --network-address --cpu 4 --memory 8 --edit
// provision 설정
provision:
- mode: system
script: |
sysctl -w vm.max_map_count=1000000
echo never >/sys/kernel/mm/transparent_hugepage/enabled
git clone <https://github.com/mongodb-university/kafka-edu.git>
zookeeper
broker
connect
schema-registry
mongo1
ADD change_streams /tutorials/change_streams
ADD source_connector /tutorials/source_connector
ADD sink_connector /tutorials/sink_connector
ADD cdc_handler /tutorials/cdc_handler
ADD time_series /tutorials/time_series
위를 보면 다양한 정보를 호스트에서 몽고컨테이너로 {db}/~~ 형태로 add 해줌을 확인 가능하다.
source connector, sink connector 연결 시 필요한 json 파일 예제를 담고있다.
이외에 불필요한 튜토리얼 관련 명령어들이 있는데 실제 사용할 땐 싹 다 제거해도 될듯하다.
mongo1-setup
cd kafka-edu/docs-examples/mongodb-kafka-base
docker-compose -p mongo-kafka up -d
provision:
- mode: system
script: |
sysctl -w vm.max_map_count=1000000
echo never >/sys/kernel/mm/transparent_hugepage/enabled
몽고디비에서 커넥터 상태를 직접확인하는 것이 아니라 status 유틸을 이용하여 curl로 kafka connect에 요청을 날려 상태를 확인하는 것이니 오해말자.
docker exec mongo1 status
Kafka topics:
"topic": "docker-connect-status",
"topic": "_schemas",
"topic": "__consumer_offsets",
"topic": "docker-connect-offsets",
"topic": "docker-connect-configs",
The status of the connectors:
Currently configured connectors
[]
Version of MongoDB Connector for Apache Kafka installed:
카프카 커넥트 관련 토픽이 생성된 것 확인 가능하다
하지만 connectors 배열이 비어있다, 왜냐하면, 커넥트 구성은 하였는데 해당 커넥트에 연결된 커넥터가 없기 때문이다.
docker exec -it mongo1 /bin/bash
해당 simple_source.json은 mongo.Dockerfile 부분에서 임의로 넣었음을 확인할 수 있다. 해당 파일을 확인한다.
curl을 통해 source connector를 연결하는 connect에서 제공하는 rest API를 call하는 모습이다.
curl -X POST -H "Content-Type: application/json" \\
-d @/tutorials/source_connector/simple_source.json <http://connect:8083/connectors> -w "\\n"
{
"error_code": 500,
"message": "Failed to find any class that implements Connector and which name matches com.mongodb.kafka.connect.MongoSourceConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='7.3.2-ccs', encodedVersion=7.3.2-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}"
}
왜냐?
curl <http://connect:8083/connector-plugins>
이거로 확인해보니 해당 커넥터 플러그인이 없었음
confluentinc/cp-kafka-connect-base 라는 이름을 가진 이미지가 도커 허브에 있어서 내부의 Dockerfile 빌드를 진행하지 않고 도커허브의 이미지를 사용하여 dockerfile에는 mongo plugin을 설치하였지만 반영이 되지 않았던 것,
docker-compose에서 connect.Dockerfile을 사용하는 해당 서비스의 이미지를 임의의 이름으로 변경하여 로컬빌드를 사용하도록 수정(dcollect-cp-kafka-connect-base:1.0)
몽고디비에서 커넥터 상태를 직접확인하는 것이 아니라 status 유틸을 이용하여 curl로 kafka connect에 요청을 날려 상태를 확인하는 것이니 오해말자.
status
Kafka topics:
"topic": "docker-connect-status",
"topic": "_schemas",
"topic": "__consumer_offsets",
"topic": "docker-connect-offsets",
"topic": "docker-connect-configs",
The status of the connectors:
source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-simple-source"
]
Version of MongoDB Connector for Apache Kafka installed:
{"class":"com.mongodb.kafka.connect.MongoSinkConnector","type":"sink","version":"1.8.0"}
{"class":"com.mongodb.kafka.connect.MongoSourceConnector","type":"source","version":"1.8.0"}
connector 연결이 된 것을 확인할 수 있다.
컨테이너 내부에서 해당 명령어를 실행해 몽고디비와 연결한다.
mongosh "mongodb://mongo1"
해당 명령어를 통해 document를 생성해본다. (대소문자 주의!)
use Tutorial1
insert 2번, update 1번, delete 1번에 해당하는 쿼리를 날린다.
db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
// 컬렉션 내용 확인하여 프라이머리 키 확인
db.orders.find()
// update
db.orders.updateOne({
_id: ObjectId("641172ed232c26415652396b")
}, {
$set: {
item: "asdasdsd"
}
})
// delete
db.orders.deleteOne({'item': 'asdasdsd'})
status
status를 쳐보면 "topic": "Tutorial1.orders", 이 생겨난 것을 확인할 수 있다.
다음 유틸 명령어를 통해 해당 토픽 안의 메시지를 확인해볼 수 있다.
kc Tutorial1.orders
MongoDB Kafka Sandbox $kc Tutorial1.orders
% Auto-selecting Consumer mode (use -P or -C to override)
Partition: 0 Offset: 0
Key (198 bytes):
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641172ED000000022B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}}"}
Value (572 bytes):
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641172ED000000022B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}, \\"operationType\\": \\"insert\\", \\"clusterTime\\": {\\"$timestamp\\": {\\"t\\": 1678865133, \\"i\\": 2}}, \\"wallTime\\": {\\"$date\\": 1678865133025}, \\"fullDocument\\": {\\"_id\\": {\\"$oid\\": \\"641172ed232c26415652396b\\"}, \\"order_id\\": 1, \\"item\\": \\"coffee\\"}, \\"ns\\": {\\"db\\": \\"Tutorial1\\", \\"coll\\": \\"orders\\"}, \\"documentKey\\": {\\"_id\\": {\\"$oid\\": \\"641172ed232c26415652396b\\"}}}"}
-------------------------
Partition: 0 Offset: 1
Key (198 bytes):
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641173FD000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641173FDF1C24928892855D80004\\"}}"}
Value (578 bytes):
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641173FD000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641173FDF1C24928892855D80004\\"}, \\"operationType\\": \\"insert\\", \\"clusterTime\\": {\\"$timestamp\\": {\\"t\\": 1678865405, \\"i\\": 1}}, \\"wallTime\\": {\\"$date\\": 1678865405880}, \\"fullDocument\\": {\\"_id\\": {\\"$oid\\": \\"641173fdf1c24928892855d8\\"}, \\"order_id\\": 1, \\"item\\": \\"coffeeChange\\"}, \\"ns\\": {\\"db\\": \\"Tutorial1\\", \\"coll\\": \\"orders\\"}, \\"documentKey\\": {\\"_id\\": {\\"$oid\\": \\"641173fdf1c24928892855d8\\"}}}"}
-------------------------
Partition: 0 Offset: 2
Key (198 bytes):
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"8264117566000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}}"}
Value (580 bytes):
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"8264117566000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}, \\"operationType\\": \\"update\\", \\"clusterTime\\": {\\"$timestamp\\": {\\"t\\": 1678865766, \\"i\\": 1}}, \\"wallTime\\": {\\"$date\\": 1678865766927}, \\"ns\\": {\\"db\\": \\"Tutorial1\\", \\"coll\\": \\"orders\\"}, \\"documentKey\\": {\\"_id\\": {\\"$oid\\": \\"641172ed232c26415652396b\\"}}, \\"updateDescription\\": {\\"updatedFields\\": {\\"item\\": \\"asdasdsd\\"}, \\"removedFields\\": [], \\"truncatedArrays\\": []}}"}
-------------------------
Partition: 0 Offset: 3
Key (198 bytes):
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641185E9000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}}"}
Value (462 bytes):
{"schema":{"type":"string","optional":false},"payload":"{\\"_id\\": {\\"_data\\": \\"82641185E9000000012B022C0100296E5A10045D5FA308B89F42A59CE45850694256C946645F69640064641172ED232C26415652396B0004\\"}, \\"operationType\\": \\"delete\\", \\"clusterTime\\": {\\"$timestamp\\": {\\"t\\": 1678869993, \\"i\\": 1}}, \\"wallTime\\": {\\"$date\\": 1678869993393}, \\"ns\\": {\\"db\\": \\"Tutorial1\\", \\"coll\\": \\"orders\\"}, \\"documentKey\\": {\\"_id\\": {\\"$oid\\": \\"641172ed232c26415652396b\\"}}}"}
-------------------------
% Reached end of topic Tutorial1.orders [0] at offset 4
insert와 update, delete 이벤트가 감지되어 토픽에 잘 들어온 것을 확인할 수 있다.
몽고디비 공식 문서에서 제공하는 튜토리얼을 이용하여 mongodb Change Event 감지를 kafka connect로 구현해보았다.
문제를 해결하며 완성한 예제는
https://github.com/suhongkim98/kafka-edu
에 올려놓았다.