
먼저 Kafka는 Producer와 Consumer를 통해 데이터 파이프라인을 만들 수 있다. 예를 들어 A서버의 DB에 저장한 데이터를 Kafka Producer/Consumer를 통해 B서버의 DB로도 보낼 수 있다. 이러한 파이프라인이 여러개면 매번 반복적으로 파이프라인을 구성해야줘야한다. KafkConnect는 이러한 반복적인 파이프라인 구성을 쉽고 간편하게 만들 수 있게 만들어진 Apache Kafka 프로젝트중 하나다.
위 사진을 보면 Kafka Connect를 이용해 왼쪽의 DB의 데이터를 Connect와 Source Connector를 사용해 Kafka Broker로 보내고 Connect와 Sink Connector를 사용해 Kafka에 담긴 데이터를 DB에 저장하는 것을 알 수 있다.
여기서 중요한 건 Connect와 Connector의 차이와 Source Connector와 Sink Connector이다.
Connect: Connector를 동작하게 하는 프로세서(서버)
Connector: Data Source(DB)의 데이터를 처리하는 소스가 들어있는 jar파일
Source Connector: data source에 담긴 데이터를 topic에 담는 역할(Producer)을 하는 connector
Sink Connector: topic에 담긴 데이터를 특정 data source로 보내는 역할(Consumer 역할)을 하는 connector
또한 Connect는 단일 모드(Standalone)와 분산 모드(Distributed)로 이루어져있다.
단일 모드(Standalone): 하나의 Connect만 사용하는 모드
분산 모드(Distributed): 여러개의 Connect를 한개의 클러스트로 묶어서 사용하는 모드.
-> 특정 Connect가 장애가 발생해도 나머지 Connect가 대신 처리하도록 함
Kafka Connect는 REST API를 사용해서 Connector를 등록 및 사용할 수 있다. 이제 예제를 한번 해보자.
이번 예제에서 mysql table에 데이터를 insert하면 다른 table에 데이터가 그대로 저장되는 예제를 해본다.
먼저 선작업으로 DB에 Table을 하나 만들자.
CREATE SCHEMA test;
CREATE TABLE test.users (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(20)
)
vi config/server.properties$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
$ ./bin/kafka-server-start.sh ./config/server.properties
$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list$ vi etc/kafka/connect-distributed.properties{
"name": "my-source-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/test",
"connection.user":"root",
"connection.password":"비밀번호",
"mode":"incrementing",
"incrementing.column.name" : "id",
"table.whitelist" : "users",
"topic.prefix" : "example_topic_",
"tasks.max" : "1",
}
}
cUrl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
각 속성의 의미는 다음과 같다.
cUrl -X GET -d @- http://localhost:8083/connectors$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list{
"name": "my-pksink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/test",
"connection.user":"root",
"connection.password":"비밀번호",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"example_topic_users"
}
}
cUrl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
sourceConnector와 겹치는 속성을 제외한 속성은 다음과 같은 뜻을 가진다
더 자세한 속성은 해당 링크에서 확인할 수 있다.
https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html
생성후 users table에 데이터를 insert하면
example_topic_users table이 생성된 것을 볼 수 있고
서로 테이블의 내용이 같은 것을 확인할 수 있다.