[ kafka ] Mac OS - kafka 구축 / kafka Connector

ma.caron_g·2022년 10월 23일
0

Kafka

목록 보기
1/1
post-thumbnail
  • 참고 링크

Kafka 설치 : https://velog.io/@kimview/MacOS-kafka-%EC%84%A4%EC%B9%98

Kafka Connector : https://cjw-awdsd.tistory.com/53

[ kafka ]

카프카(kafka)란, 대용량, 대규모 메시지 데이터를 빠르게 처리할 수 있도록 개발된 분산 메시징 플랫폼으로

Producer(생산자)와 Consumer(소비자)를 통해 데이터 파이프라인을 만들 수 있다.

[ kafka 설치 ]

[ wget을 이용한 설치 ]

Terminal을 이용해서 설치할 것이므로 wget을 설치해줍니다.

$ brew install wget

명령어를 이용하여 brew를 통해 wget을 설치합니다.

$ wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz

wget을 이용해서 카프카 파일을 다운 받습니다.

$ tar xvf kafka_2.13-2.8.0.tgz

명령어를 입력하고 다운 받은 kafka 파일을 압축해제합니다.

[ zookeeper 구동 ]

$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

[ kafka 구동 ]

$ bin/kafka-server-start.sh -daemon config/server.properties

[ 구동 확인 ]

$ netstat -an | grep 2181

명령어를 입력하면

다음과 같이 구동되고 있는 것을 확인할 수 있습니다.

[ Kafka Connect 설치 ]

kafka connect를 실행합니다.

$ ./bin/connect-distributed.sh ..confluent-6.1.0/etc/kafka/connect-distributed.properties

실행 후 (창 끄면 안 됩니다.) topic 리스트를 확인하는 명령어를 입력해줍니다.

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

를 입력하면

[2022-10-22 16:30:01,228] INFO [GroupCoordinator 0]: Stabilized group connect-cluster generation 3 (__consumer_offsets-13) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2022-10-22 16:30:01,229] INFO [GroupCoordinator 0]: Assignment received from leader for group connect-cluster for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

Connect 터미널에서 다음과 같은 내용이 추가됩니다.

maseunghyun@maseunghyeon-ui-MacBookPro kafka_2.13-2.8.0 % ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
quickstart-events

list를 확인한 터미널에서는 토픽 리스트를 보여줍니다.

[ kafka Connector ] 설치

https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html

다음 링크로 이동해서 Connector를 다운 받아줍니다.

아래로 내리면 ZIP 파일을 다운 받을 수 있습니다.

[ MySQL Connector ]

데이터베이스에 연결하기위해 Connector를 다운 받아줍니다.

https://dev.mysql.com/downloads/connector/j/

다운 받은 jar파일을
kafka-connetor 디렉토리/share/java/kafka에 복사해줍니다.

이제 REST API를 통해 Source Connector와 Sink Connector를 생성해보겠습니다.

POST Man을 이용하여
POST 방식으로 다음과 같이 작성합니다

http://localhost:8083/connectors

{
    "name": "my-source-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://localhost:3306/[본인 DB명]",
        "connection.user":"[유저 아이디]",
        "connection.password":"[비밀번호]",
        "mode":"[데이터 polling 방식]",
        "incrementing.column.name" : "[ mode가 incrementing일 때, 자동 증가할 column 이름",
        "table.whitelist" : "[데이터 변경을 감지할 table 이름]",
        "topic.prefix" : "[kafka 토픽에 저장할 토픽 이름 + 위에 table.whitelist가 붙음]",
        "tasks.max" : "1",
    }
}

실행 후 GET 방식으로 링크의 정보를 가져오면

POST했던 name 값이 등록 돼 있습니다.

[ Sink Connector ]

Source Connector를 통해 topic에 넣은 데이터를 Sink하기 위해 Sink Connector를 생성해줍니다.

Source Connecotr과 동일하게

{
    "name": "my-pksink-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://localhost:3306/[DB명]",
        "connection.user":"[유저 아이디]",
        "connection.password":"[비밀번호]",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"[토픽 이름]"
    }
}

[ ln -s ]

디렉토리에 대한 심볼릭 링크
카프카 폴더로 이동하려고하면
저는 kafka_2.~ 폴더와 kafka_2.~.tgz 가 같이 있어 tab을 눌러도 자동완성이 되질 않는 경우가 존재합니다.
이를 방지하기 위해 kafk_2.~ 폴더를 심볼릭 링크를 걸어 tab을 눌러 간단하게 폴더에 접근할 수 있도록 만들었습니다.

ln -s [대상폴더명] [심볼명]

ln -s kafka_2.13-2.8.0 kafka

그럼 이제 cd kafka를 입력하면 kafka_2.13-2.8.0 폴더로 이동하게 됩니다.

maseunghyun@maseunghyeon-ui-MacBookPro ~ % cd kafka
maseunghyun@maseunghyeon-ui-MacBookPro kafka % ls
LICENSE		NOTICE		bin		config		libs		licenses	logs		site-docs
maseunghyun@maseunghyeon-ui-MacBookPro kafka %  
profile
다른 사람이 만든 것을 소비하는 활동보다, 내가 생산적인 활동을 하는 시간이 더 많도록 생활화 하자.

0개의 댓글