Kafka Conncet

이재철·2021년 10월 10일
0

MSA

목록 보기
8/13
  • Kafka Conncet를 통해 Data를 Import/Export 가능
  • 코드 없이 Configuration으로 데이터를 이동
  • Standalone mode, Distribution Mode 지원
    • RESTful API 통해 지원
    • Stream 또는 Batch 형태로 데이터 전송 가능
    • 커스텀 Connector를 통한 다양한 플러그인 제공 (File, S3, Hive, MySQL...)
  • Kafka Conncet Source : 데이터를 가져오는 쪽
  • Kafka Conncet Sink : 데이터를 보내는 쪽
    • 토픽에 등록된 데이터 파일을 타겟 시스템쪽으로 옮겨주는 역할

Kafka Conncet 설치

  • 필자는 모든 설치파일을 /Users/lee/development 경로 안에 저장했음.
curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
  or
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
tar xvf confluent-community-6.1.0.tar.gz
cd  confluent-community-6.1.0

실행
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
* confluent-community-6.1.0 폴더 내에서
code ./etc/kafka/connect-distributed.properties

  • JdbcSourceConnector에서 mysql 사용하기 위해 드라이버 복사
mvn 의 빌드된 jar들은 .m에 다있음.
cd /Users/lee/.m2/repository/mysql/mysql-connector-java/8.0.26

 cp ./mysql-connector-java-8.0.26.jar /Users/lee/development/kafka/confluent-6.1.0/share/java/kafka

Kafka Conncet 사용

1. Kafka Connce Source 등록

  • Kafka Source Conncet 등록 (포스트맨을 사용)
  • 등록
POST / http://localhost:8083/connectors
Type application/json
data
{
    "name" : "my-source-connect",
    "config" : {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url":"jdbc:mysql://localhost:3306/mydb",
    "connection.user":"root",
    "connection.password":"1234",
    "mode": "incrementing",
    "incrementing.column.name" : "id",
    "table.whitelist":"users",
    "topic.prefix" : "my_topic_",
    "tasks.max" : "1"
    }
}
  • 목록
GET / http://localhost:8083/connectors

  • 상세 확인
http://localhost:8083/connectors/my-source-connect/status

  • 이제 테이블에 값을 넣을 때마다 메시지를 보내는 것을 볼 수 있음.
    (처음에는 바뀐 것이 없기 때문에 my_topic_users 가 없음)
insert into users(user_id, pwd, name) values('user', '1234', 'name');
insert into users(user_id, pwd, name) values('user2', '1234', 'name2');

2. Kafka Connce Sink 등록

  • Kafka Sink Conncet 등록 (포스트맨을 사용)
  • 등록
POST / http://localhost:8083/connectors
Type application/json
data
{
    "name":"my-sink-connect",
    "config":{
    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url":"jdbc:mysql://localhost:3306/mydb",
    "connection.user":"root",
    "connection.password":"1234",
    "auto.create":"true",
    "auto.evolve":"true",
    "delete.enabled":"false",
    "tasks.max":"1",
    "topics":"my_topic_users"
    }
}
  • 목록 및 상세 확인은 동일함.
  • Sink 등록 후 DB를 보면 테이블 하나가 생성된 것과 데이터가 들어가 있는 것을 볼 수있음.
    테이블 생성이 됨
    users 테이블 값
    생성된 테이블 값
  • 위의 이미지를 보면 users와 my_topic_users 가 동일한 데이터가 있는 것을 볼 수 있음.
  • producer에서 값 보내기
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"pwd"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}],"optional":false,"name":"users"},"payload":{"id":7,"user_id":"user5","pwd":"7777","name":"name7","created_at":1633890976000}}


  • producer에서 값을 보내면 위의 이미지처럼 users에는 값이 들어가지 않고 my_topic_users에만 값을 들어 가는 것을 볼 수 있음
  • 이유 : user 토픽으로 부터 데이터를 가져오는 게 아니라 자신이 가지고 있는 데이터를 my_topic_users에 데이터를 넣는 역할임

에러

  • 만약 producer에서 값을 잘못 보내면 sink가 에러로 죽음!!
  • 해결 방안으로 토픽을 삭제 해주고 다시 실행! 또는 토픽 초기화를 해주면 됨.

토픽 삭제

  • config/server.properties 에서 다음을 추가
delete.topic.enable=true
  • 카프카 재실행 후 다음 커맨드 실행
./bin/kafka-topics.sh --delete --bootstrap-server localhost:9092  --topic my_topic_users
  • 토픽 초기화는 아래 메소드를 통해서 할 수 있는 듯 하다

Kafka Conncet 관련 메소드

- GET /connectors - return a list of active connectors
- POST /connectors - create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
- GET /connectors/{name} - get information about a specific connector
- GET /connectors/{name}/config - get the configuration parameters for a specific connector
- PUT /connectors/{name}/config - update the configuration parameters for a specific connector
- GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
- GET /connectors/{name}/tasks - get a list of tasks currently running for a connector
- GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
- PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed
- PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused)
- POST /connectors/{name}/restart - restart a connector (typically because it has failed)
- POST /connectors/{name}/tasks/{taskId}/restart - restart an individual task (typically because it has failed)
- DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration
- GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued
- PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector

0개의 댓글