Kafka connect

Log·2022년 11월 18일
0
post-thumbnail

kafka connect

카프카 커넥트는 카프카 오픈소스에 포함된 툴 중 하나로 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션이다. 파이프라인마다 프로듀서, 컨슈머를 개발하는 것은 비효율적이다. 커넥트는 특정 작업 형태를 템플릿으로 만들어놓은 커넥터를 실행함으로써 반복 작업을 줄일 수 있다.
커넥터의 경우 소스 커넥터, 싱크 커넥터 2가지로 나뉘며 소스의 경우 데이터를 카프카 토픽으로 전송하는 프로듀서 역할, 싱크의 경우 토픽의 데이터를 파일로 저장하는 컨슈머 역할을 한다.

커넥트 실행 방법

  • 단일 모드 커넥트
    • 단일 애플리케이션으로 실행
    • 커넥터를 정의하는 파일을 작성하고 해당 파일을 참조하는 단일 모드 커넥트를 실행함을써 파이프라인 생성
    • 1개 프로세스만 실행되기 때문에 단일 장애점이 될 수 있음
    • 개발환경이나 중요도가 낮은 파이프라인을 운영할 때 사용
  • 분산 모드 커넥트
    • 2대 이상의 서버에서 클러스터 형태로 운영
    • 단일 모드 커넥트 대비 안전하게 운영할 수 있음
    • 데이터 처리양의 변환에도 유연하게 대응 가능

단일 모드 커넥트

connect-stadalone.properties

단일 모드 커넥트를 참조하는 설정 파일

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
  • bootstrap.servers
    커넥트와 연동할 카프카 클러스터의 호스트-포트. 2개 이상의 브로커라면 ,로 구분하여 적으면 된다.
  • key.converter, value.converter
    데이터를 카프카에 저장할 때 혹은 카프카에서 데이터를 가져올 때 변환하는 데에 사용
    JsonConverter, StringConverter, ByteArrayConverter를 기본으로 제공
    만약 사용하고 싶지 않다면 key.converter.schemas.enable=false or value.converter.schemas.enable=false로 설정
  • offset.storage.file.filename
    단일 모드 커넥터는 로컬 파일에 오프셋 정보를 저장하며, 이 정보는 소스 커넥터 또는 싱크 커넥터가 데이터 처리 시점을 저장하기 위해 사용
    해당 정보는 다른 사용자나 시스템이 접근하지 않도록 주의해야 함
  • offset.flush.interval.ms
    태스크가 처리 완료한 오프셋을 커밋하는 주기
  • plugin.path
    플러그인 형태로 추가할 커넥터의 디렉토리 주소로, 커넥터의 jar파일이 위치하는 디렉토리 값을 입력
    커넥터 이외에도 직접 컨버터, 트랜스폼도 플러그인으로 추가할 수 있음
connect-file-source.profile

파일 소스 커넥터

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
  • name
    커넥터의 이름으로 유일해야 함
  • connector.class
    사용할 커넥터의 클래스 이름
  • task.max
    커넥터로 실행할 태스크 개수로, 늘려서 병렬처리 가능
  • file
    읽을 파일의 위치
  • topic읽은 파일의 데이터를 저장할 토픽 이름
실행
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties \
$KAFKA_HOME/config/connect-file-source.properties 

분산 모드 커넥트

단일 모드 커넥트와 다르게 2개 이상의 프로세스가 1개의 그룹으로 묶여서 운영된다.

connect-distributed.properties
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

offset.flush.interval.ms=10000

# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
#listeners=HTTP://:8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for "listeners" if configured.
#rest.advertised.host.name=
#rest.advertised.port=
#rest.advertised.listener=

plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
  • bootstrap.servers
    커넥트와 연동할 카프카 클러스터의 호스트 이름과 포트
  • group.id
    다수의 커넥트 프로세스들을 묶을 그룹 이름
    동일한 group.id로 지정된 경우 커넥트들은 같은 그룹으로 인식
    같은 그룹으로 지정된 커넥트들에서 커넥터가 실행되면 커넥트들에 분산되어 실행
  • key.converter, value.converter
    데이터를 카프카에 저장할 때 혹은 카프카에서 데이터를 가져올 때 변환하는 데에 사용
    JsonConverter, StringConverter, ByteArrayConverter를 기본으로 제공
    만약 사용하고 싶지 않다면 key.converter.schemas.enable=false or value.converter.schemas.enable=false로 설정
  • offset.storage.topic
    분산 모드 커넥트의 경우 카프카 내부 토픽에 오프셋 정보를 저장
    소스 커넥터 또는 싱크 커넥터가 데이터 처리 시점을 저장하기 위해 사용
    실제로 운영시 복제개수를 3이상으로 설정하는 것이 좋음
  • offset.flush.interval.ms
    태스크가 처리 완료한 오프셋을 커밋하는 주기
  • plugin.path
    플러그인 형태로 추가할 커넥터의 디렉토리 주소로, 커넥터의 jar파일이 위치하는 디렉토리 값을 입력
    커넥터 이외에도 직접 컨버터, 트랜스폼도 플러그인으로 추가할 수 있음

소스 커넥터

소스 커넥터는 소스 애플리케이션 또는 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할을 한다.
필요한 클래스는 2개로 아래와 같다.

  • SourceConnector
    • 테스크를 실행하기 전 커넥터 설정파일을 초기화
    • 어떤 테스크 클래스를 사용할 것인지 정의하는데 사용
  • SourceTask
    • 실제로 데이터를 다루는 클래스
    • 소스 애플리케이션 또는 소스 파일로부터 데이터를 가져와 토픽으로 데이터를 보내는 역할을 수행
    • 토픽에서 사용하는 오프셋이 아닌 자체적으로 사용하는 오프셋을 사용
profile
열심히 정리하는 습관 기르기..

0개의 댓글