- 참고 링크
Kafka 설치 : https://velog.io/@kimview/MacOS-kafka-%EC%84%A4%EC%B9%98
Kafka Connector : https://cjw-awdsd.tistory.com/53
카프카(kafka)란, 대용량, 대규모 메시지 데이터를 빠르게 처리할 수 있도록 개발된 분산 메시징 플랫폼으로
Producer(생산자)와 Consumer(소비자)를 통해 데이터 파이프라인을 만들 수 있다.
Terminal을 이용해서 설치할 것이므로 wget을 설치해줍니다.
$ brew install wget
명령어를 이용하여 brew를 통해 wget을 설치합니다.
$ wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
wget을 이용해서 카프카 파일을 다운 받습니다.
$ tar xvf kafka_2.13-2.8.0.tgz
명령어를 입력하고 다운 받은 kafka 파일을 압축해제합니다.
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
$ netstat -an | grep 2181
명령어를 입력하면
다음과 같이 구동되고 있는 것을 확인할 수 있습니다.
kafka connect를 실행합니다.
$ ./bin/connect-distributed.sh ..confluent-6.1.0/etc/kafka/connect-distributed.properties
실행 후 (창 끄면 안 됩니다.) topic 리스트를 확인하는 명령어를 입력해줍니다.
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
를 입력하면
[2022-10-22 16:30:01,228] INFO [GroupCoordinator 0]: Stabilized group connect-cluster generation 3 (__consumer_offsets-13) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2022-10-22 16:30:01,229] INFO [GroupCoordinator 0]: Assignment received from leader for group connect-cluster for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
Connect 터미널에서 다음과 같은 내용이 추가됩니다.
maseunghyun@maseunghyeon-ui-MacBookPro kafka_2.13-2.8.0 % ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
quickstart-events
list를 확인한 터미널에서는 토픽 리스트를 보여줍니다.
https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
다음 링크로 이동해서 Connector를 다운 받아줍니다.
아래로 내리면 ZIP 파일을 다운 받을 수 있습니다.
데이터베이스에 연결하기위해 Connector를 다운 받아줍니다.
https://dev.mysql.com/downloads/connector/j/
다운 받은 jar파일을
kafka-connetor 디렉토리/share/java/kafka
에 복사해줍니다.
이제 REST API를 통해 Source Connector와 Sink Connector를 생성해보겠습니다.
POST Man을 이용하여
POST 방식으로 다음과 같이 작성합니다
http://localhost:8083/connectors
{
"name": "my-source-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/[본인 DB명]",
"connection.user":"[유저 아이디]",
"connection.password":"[비밀번호]",
"mode":"[데이터 polling 방식]",
"incrementing.column.name" : "[ mode가 incrementing일 때, 자동 증가할 column 이름",
"table.whitelist" : "[데이터 변경을 감지할 table 이름]",
"topic.prefix" : "[kafka 토픽에 저장할 토픽 이름 + 위에 table.whitelist가 붙음]",
"tasks.max" : "1",
}
}
실행 후 GET 방식으로 링크의 정보를 가져오면
POST했던 name 값이 등록 돼 있습니다.
Source Connector를 통해 topic에 넣은 데이터를 Sink하기 위해 Sink Connector를 생성해줍니다.
Source Connecotr과 동일하게
{
"name": "my-pksink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/[DB명]",
"connection.user":"[유저 아이디]",
"connection.password":"[비밀번호]",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"[토픽 이름]"
}
}
디렉토리에 대한 심볼릭 링크
카프카 폴더로 이동하려고하면
저는 kafka_2.~ 폴더와 kafka_2.~.tgz 가 같이 있어 tab을 눌러도 자동완성이 되질 않는 경우가 존재합니다.
이를 방지하기 위해 kafk_2.~ 폴더를 심볼릭 링크를 걸어 tab을 눌러 간단하게 폴더에 접근할 수 있도록 만들었습니다.
ln -s [대상폴더명] [심볼명]
ln -s kafka_2.13-2.8.0 kafka
그럼 이제 cd kafka를 입력하면 kafka_2.13-2.8.0 폴더로 이동하게 됩니다.
maseunghyun@maseunghyeon-ui-MacBookPro ~ % cd kafka
maseunghyun@maseunghyeon-ui-MacBookPro kafka % ls
LICENSE NOTICE bin config libs licenses logs site-docs
maseunghyun@maseunghyeon-ui-MacBookPro kafka %