Kafka와 외부 데이터 소스나 저장소 간에 데이터를 효율적으로 이동시키기 위한 프레임워크
이를 통해 kafka 토픽에 저장되는 데이터를 실시간으로 감지해서 추출할 수 있다.
- 플러그 가능한 아키텍처: Kafka Connect는 다양한 소스와 싱크를 위한 플러그인(커넥터)을 지원한다. 커넥터는 오픈소스로 제공되거나 커뮤니티나 개발자에 의해 만들어질 수 있다.
- 분산 및 확장성: Kafka Connect는 분산 환경에서 실행될 수 있다. 이는 데이터 처리량이 늘어나거나 시스템이 복잡해져도 쉽게 확장 가능하다는 것을 의미
- 오프셋 관리: 내부적으로 Kafka에서 오프셋을 관리하기 때문에, 장애가 발생했을 때도 데이터 무결성을 유지할 수 있다.
- 설정 기반의 실행: 코드를 작성할 필요 없이 간단한 JSON 또는 YAML 파일로 Kafka Connect를 설정할 수 있다.
- 스트림과 배치 모드 지원: Kafka Connect는 실시간 스트림 처리뿐만 아니라, 배치 작업도 지원
결과적으로, Kafka Connect를 이용하면 코드를 작성하지 않고 설정만으로 다양한 외부 시스템과 Kafka 간의 데이터 이동을 처리할 수 있어 운영적인 편의성이 높다.
- connect는 외부 시스템과 kafka의 데이터 이동 작업을 처리하는 프레임워크
- connector는 connect 프레임워크 안에서 실행되는 플러그인으로 로직을 담당, 커넥터는 소스 커넥터와 싱크 커넥터로 구분된다.
- 소스 커넥터(Source Connector): 외부 시스템(예: RDBMS, 로그 파일, 외부 API 등)에서 데이터를 가져와 Kafka 토픽으로 전달
- 싱크 커넥터(Sink Connector): Kafka 토픽에서 데이터를 가져와 외부 시스템(예: RDBMS, ElasticSearch, HDFS 등)으로 전달
- ec2 인스턴스 환경에서 도커 컴포즈를 통해 kafka 클러스터를 만들어 놓은 뒤의 작업과정임을 유의
- kafka 패키지의 일부이므로 따로 설치할 필요없이 설정 파일 수정을 통해 실행 가능
kafka-connect:
image: confluentinc/cp-kafka-connect:5.5.1
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8083:8083"
environment:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/java/kafka-connect-elasticsearch"
CONNECT_CONFIG_STORAGE_TOPIC: "my_connect_configs"
CONNECT_OFFSET_STORAGE_TOPIC: "my_connect_offsets"
CONNECT_STATUS_STORAGE_TOPIC: "my_connect_statuses"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29093,kafka-3:29094
CONNECT_REST_PORT: 8083
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_GROUP_ID: compose-connect-group
docker-compose 파일에 위의 kafka connect 내용 추가 후 실행
Confluent Hub를 통해서 설치. Confluent Hub는 Kafka Connect에서 사용되는 다양한 플러그인을 간편하게 다운로드할 수 있는 저장소
# 폴더 만들고 이동
$ mkdir confluent-hub
$ cd confluent-hub
# 현재 경로에 다운로드하고 압축해제
$ curl -L -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
$ tar -zxvf confluent-hub-client-latest.tar.gz
# 하위 폴더 추가
$ mkdir plugins
# PATH 환경변수 추가
$ vi ~/.bashrc
export CONFLUENT_HOME='~/confluent-hub'
export PATH=$PATH:$CONFLUENT_HOME/bin
$ source ~/.bashrc
# Elasticsearch Connector 설치
$ cd ./confluent-hub/bin
$ confluent-hub install confluentinc/kafka-connect-elasticsearch:5.5.1 --no-prompt --component-dir /home/ubuntu/confluent-hub/plugins
# kafka-connect 부분에 아래 내용 추가
volumes:
- /home/ubuntu/confluent-hub/plugins/confluentinc-kafka-connect-elasticsearch:/usr/share/java/kafka-connect-elasticsearch
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "coupang",
"key.ignore": "true",
"schema.ignore": "true",
"connection.url": "http://{ip주소}:9200",
"type.name": "kafka-connect",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"
}
}'