카프카 커넥트는 카프카 오픈소스에 포함된 툴 중 하나로 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션이다. 파이프라인마다 프로듀서, 컨슈머를 개발하는 것은 비효율적이다. 커넥트는 특정 작업 형태를 템플릿으로 만들어놓은 커넥터를 실행함으로써 반복 작업을 줄일 수 있다.
커넥터의 경우 소스 커넥터, 싱크 커넥터 2가지로 나뉘며 소스의 경우 데이터를 카프카 토픽으로 전송하는 프로듀서 역할, 싱크의 경우 토픽의 데이터를 파일로 저장하는 컨슈머 역할을 한다.
단일 모드 커넥트를 참조하는 설정 파일
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
bootstrap.servers
key.converter
, value.converter
key.converter.schemas.enable=false
or value.converter.schemas.enable=false
로 설정offset.storage.file.filename
offset.flush.interval.ms
plugin.path
파일 소스 커넥터
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
name
connector.class
task.max
file
topic
읽은 파일의 데이터를 저장할 토픽 이름$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties \
$KAFKA_HOME/config/connect-file-source.properties
단일 모드 커넥트와 다르게 2개 이상의 프로세스가 1개의 그룹으로 묶여서 운영된다.
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
offset.flush.interval.ms=10000
# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
#listeners=HTTP://:8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for "listeners" if configured.
#rest.advertised.host.name=
#rest.advertised.port=
#rest.advertised.listener=
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
bootstrap.servers
group.id
key.converter
, value.converter
key.converter.schemas.enable=false
or value.converter.schemas.enable=false
로 설정offset.storage.topic
offset.flush.interval.ms
plugin.path
소스 커넥터는 소스 애플리케이션 또는 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할을 한다.
필요한 클래스는 2개로 아래와 같다.