단순히 데이터 전달 뿐만 아니라 데이터의 지연 시간을 낮춰주는 용도로 사용한다.
Apache Kafka 홈페이지에 들어가서 다운로드를 받으면 된다.
1. 주키퍼 실행 (2181 포트)
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
2. 카프카 메인 서버 실행 (9092 포트)
./bin/kafka-server-start.sh ./config/server.properties
토픽 확인
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
3. 토픽 생성 (토픽명 : quickstart-events)
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic quickstart-events --partitions 1
토픽 확인 명령어 쳐보면 quickstart-events 토픽이 생성된 것을 확인
4. 토픽 상세 정보 보기
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic quickstart-events
5. Producer 와 Consumer 역할을 해보자.
Producer 등록
/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events
Consumer 등록
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning
모든 메시지를 처음부터 얻기 위한 옵션 추가 (--from-beggining)
SourceSystem -> Kafka Connect Source -> Kafka Cluster -> Kafka Connect Sink -> Target System(S3...)
터미널에서 유레카 서비스 폴더로 이동한다.
그런후 order-service 에 가서 Mysql 의존성을 추가한다.
생성한 테이블에 Kafka Connect 를 이용해서 users 테이블에 데이터가 insert 되면 그 데이터를 감지했다가 새로운 데이터베이스에 옮기는 작업을 해보자.
curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
- tar xvf confluent-community-6.1.0.tar.gz
Kafka 폴더에서 명령어로 실행
Kafka Connect 설정 (기본으로 사용)
/config/connect-distributed.properties
Kafka Connect 실행
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
Topic 목록 화인
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
JAVA 에서 관계형 데이터 베이스를 사용하기 위해서 JDBC 라이브러리를 설치해야한다.
주키퍼 서버 + 카프 메인 서버를 모두 실행 시키고 위에 명령어에서 Kafka Connect 를 실행하면 된다.
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
실행결과
__consumer_offsets
connect-configs
connect-offsets
connect-status
quickstart-events
그런후에 설치한 kafka-connect-jdbc 파일에 가서 lib 폴더까지 들어간 후 그 경로를 복사한다.
마지막으로 m2/jdbc/mariadb-java-client/version/ 에 들어가서 mariadb-java-client-version.jar 파일을 /confluent-6.1.0/share/java/kafka 로 옮겨주면 된다.
이제 Kafka 를 사용하기 위한 사전 준비는 다 했고 Kafka Source Connect 테스트를 해보자.
SourceSystem -> Kafka Connect Source -> Kafka Cluster -> Kafka Connect Sink -> Target System(S3...)
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"password",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
테스트를 할 때 MariaDB 에서 데이터를 추가하면 consumer 를 실행했을 때 그 데이터베이스에서 추가한 내용이 출력이 되는지 확인한다.
그런 후 토픽에 잘 보내졌는가? 실제 소스가 만들어졌는가? 를 확인해보면 된다.
127.0.0.1:8083/connectors GET 으로 source 를 볼 수 있고
127.0.0.1:8083/connectors/source명/status GET 으로 요청하면 더 자세하게 볼 수 있다.
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
insert into users(user_id , pwd , name) values('user1' , 'test1111' , 'Username')
실행 결과
__consumer_offsets
connect-configs
connect-offsets
connect-status
my_topic_users
quickstart-events
./bin/kafka-console-consumer.sh --bootstrap-server
localhost:9092 --topic my_topic_users --from-beginning
실행결과
{
"schema":{
"type":"struct",
"fields":[
{"type":"int32","optional":false,"field":"id"},
{"type":"string","optional":true,"field":"user_id"},
{"type":"string","optional":true,"field":"pwd"},
{"type":"string","optional":true,"field":"name"},
{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,
"field":"created_at"}],
"optional":false,
"name":"users"},
"payload":{
"id":1,
"user_id":"user1",
"pwd":"test1111",
"name":"Username",
"created_at":1709140788000
}
}
}
DB 에 직접 데이터를 전달하지 않고 Topic 에 데이터를 전달하기만 하면 그 값이 DB 에 넣을 수 있도록 해야한다.
위와 같은 작업을 해주는 것이 Sink Connect 이다.
Source Connect 에서 했던 것 처럼 JSON 형태로 요청을 보내는 형식으로 해본다.
MariaDB 에서 데이터를 추가 하면 my_Db.my_topic_users 에도 들어가는지 확인한다.
Kafka Producer 를 이용해서 Kafka Topic 에 데이터 직접 전송
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
sink 가 정상적으로 만들어졌다는 것은 지금 우리가 가지고 있는 데이터베이스에 topic 의 이름과 같은 형태의 테이블이 생성되었다는 것을 확인한다.
즉 지금까지 흐름을 보면 MariaDB 에다가 insert 쿼리를 보내면 topic 에 데이터가 전달이 되고 이 topic 에서 다른 Target Table 에 들어가는 것을 확인한 것이다.
이번에는 Producer 에서 직접 콘솔로 데이터를 넣어보자.
(kafka file)
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_users
다음에는 이렇게 배운 Source Connect + Sink Connect 를 이용해서 전에 만들었던 order-service , catalog-service 에도 적용해보자.