Apache Kafka Connect

이원석·2023년 10월 19일
0

ApacheKafka

목록 보기
2/3
post-thumbnail
본 내용은 인프런의 이도원님의 SpringCloud 강의를 참고하여 작성되었습니다.



1. Kafka Connect (Source or Sink)

Data를 Kafka와 연결하여 Import/Export 하기 위해서는 Kafka Connect가 필요하다! Kafka Connect는 코드 없이 Configuration만으로 데이터를 이동이 가능하다.

Kafka Connect를 사용하면 데이터 변환 및 전처리 작업을 수행할 수 있으며, 실시간 데이터 스트림 처리를 지원하므로 데이터가 생성되는 즉시 처리 및 분석이 가능하다. 데이터 형식을 변환하거나 데이터를 필터링하거나 데이터의 품질을 향상시킬 수 있다.

또한 Database와 연결하기 위한 JDBC 커넥터의 설치가 필요하다.

ex)
OrderService Database(1) 변경감지 → Kafaka Connect Source → Kafka Cluster → Kafka Connect Sink → OrderService Databsee(2) 데이터 동기화



1-1. Kafka Connect 설치

<Mac OS 기준>
- curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
- curl -O http://packages.confluent.io/archive/5.5/confluent-community-6.1.0.tar.gz
- cd $KAFKA_CONNECT_HOME

Kafka Connect 설정 (기본)

$KAFKA_HOME/config/connect-distributed.properties

Kafka Connect 실행

./bin/connect-distributed ./etc/kafka/connect-distributed.properties

Topic 목록 확인

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Kafka Console Cosumer 로그 확인

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_xxx --from-beginning



1-2. JDBC 커넥터 설정

Kafka Connect의 설정파일 (connect-distributed.properties)에 JDBC lib 디렉토리를 추가해주자!


1-3. maridaDB 연결

Kafka Connect의 share/java/kafka 디렉토리에 사용할 버전의 mariaDB의 jar 파일을 추가해주자!



2. Kafka Connect Source

Kafka Connect Source에 jdbc 정보를 등록하면 해당 Database의 테이블에 대한 변경 감지가 가능하다.

등록한 DB의 테이블에 변경사항이 적용될 시, Connect Source에서 이를 감지하고 Kafka Cluster의 Topic에 메시지를 전송한다.

Topic을 구독한 Kafka Connect Sink 에서 메시지를 읽어, 변경 사항을 다른 Database에 동기화 하는 작업을 구현해보자!

2-1. Kafka Connect Server를 구동

./bin/connect-distributed ./etc/kafka/connect-distributed.properties

2-2. Kafka Connect Source 등록

구동시킨 Kafka Connect Server(http://localhost:8083/connectors) 에 POST 형식으로 아래의 JSON을 전송하자.

{
    // 이 커넥터의 이름
    "name": "my-source-connect",

    // 커넥터 설정
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://localhost:3306/userdb",
        "connection.user": "root",
        "connection.password": "1234",

        // 모드 설정
        "mode": "incrementing",

        // 증가 열 이름 설정
        "incrementing.column.name": "id",

        // 테이블 화이트리스트 설정
        "table.whitelist": "user",

        // 토픽 접두사 설정
        "topic.prefix": "my_topic_",

        // 최대 작업 수 설정
        "tasks.max": "1"
    }
}

Kafka Connect Server(http://localhost:8083/connectors/connector-name) 에 GET 형식의 요청을 보내면 자세한 Connect Source에 대한 정보를 확인할 수 있다.


2-3. DB 데이터 추가

	"connection.url": "jdbc:mysql://localhost:3306/userdb",
	...
	"table.whitelist": "user",
	"topic.prefix" : "my_topic_",

user 데이터를 추가해보자!

mydb의 user 테이블에 새로운 데이터를 추가하면 my_topic_user 라는 새로운 Topic이 등록되며, user 테이블에서 발생한 변경사항 메시지가 해당 Topic에 저장된다.


2-4. Kafka Console Consumer

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_user --from-beginning

위의 명령어를 통해 Console Cosumer에 my_topic_user라는 이름으로 등록된 Topic의 메시지들을 조회할 수 있다.



3. Kafka Sink Connect

{
    "name": "my-sink-connect",

    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

        // 데이터베이스 연결 정보
        "connection.url": "jdbc:mysql://localhost:3306/userdb",
        "connection.user": "root",
        "connection.password": "1234",

        // 새로운 테이블 자동 생성
        "auto.create": "true",

        // 테이블 스키마 자동 업데이트
        "auto.evolve": "true",

        // 레코드 삭제 비활성화
        "delete.enabled": "false",

        // 작업 수 최대 1개로 설정
        "tasks.max": "1",

        // 구독할 토픽 이름
        "topics": "my_topic_user"
    }
}

Kafka Sink Connect에 해당 JSON 내용을 POST하게 되면 Kafka-Conseol-Consumer 는 my_topic_users Topic을 구독하게 된다.

구독한 Topic에 새로운 메시지가 추가되면 Kafka Sink Connect가 Topic 이름으로 데이터 동기화를 하기 위한 테이블을 생성한다. 이후 Topic에 저장된 메시지들을 테이블에 저장한다.

Sink Connect에 연결하니 Kafka Console Consumer 의 my_topic_user 토픽을 구독하며 그 동안 저장된 모든 메시지들을 참조하여 my_topic_user 테이블을 생성하고 데이터를 저장했다!



4. Delete Sink or Source Connector

등록한 Kafka Connection (Sink, Source) 의 설정정보를 삭제하고 싶다면

DELETE http://localhost:8083/connectors/connection-name 요청으로 삭제가 가능하다.




참고문헌
Inflearn: Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) 강의자료

0개의 댓글

관련 채용 정보