배포
, 운영
하는것은 비효율적커넥터
를 실행함으로써 반복작업을 줄일 수 있음.출처: https://debezium.io/documentation/reference/1.3/architecture.html
사용자가 커넥트에 커넥터 생성 명령을 내리면 커넥트는 내부적으로 커넥터와 태스크를 생성한다. 사용자가 커넥터를 사용하여 파이프라인을 생성할 때 컨버터
와 트랜스폼
기능을 옵션으로 추가할 수 있다.
JsonConverter
, StringConverter
, ByteArrayConverter
를 지원하고 필요하다면 커스텀 컨버터를 작성할 수 있다.Cast
, Drop
, ExtractField
등이 있다.HDFS
, AWS S3
, JDBC 커넥터
, 엘라스틱서치 커넥터
등이있다.커넥트를 실행하는 방법은 크게 2가지가 존재한다.
출처 : https://turkogluc.com/apache-kafka-connect-introduction/
요청 메소드 | 호출 경로 | 설명 |
---|---|---|
GET | / | 실행 중인 커넥트 정보확인 |
GET | /connectors | 실행 중인 커넥터 이름 확인 |
POST | /connectors | 새로운 커넥터 생성 요청 |
GET | /connectors/{커넥터 이름} | 실행중인 커넥터 정보 확인 |
GET | /connectors/{커넥터 이름}/config | 실행중인 커넥터의 설정값 확인 |
PUT | /connectors/{커넥터 이름}/config | 실행중인 커넥터 설정값 변경 요청 |
GET | /connectors/{커넥터 이름}/status | 실행중인 커넥터 상태 확인 |
POST | /connectors/{커넥터 이름}/restart | 실행중인 커넥터 재시작 요청 |
PUT | /connectors/{커넥터 이름}/pause | 커넥터 일시 중지 요청 |
PUT | /connectors/{커넥터 이름}/resume | 일시 중지된 커넥터 실행 요청 |
DELETE | /connectors/{커넥터 이름}/ | 실행 중인 커넥터 종료 |
GET | /connectors/{커넥터 이름}/tasks/{태스크 아이디}/status | 실행 중인 커넥터의 태스크 상태 확인 |
POST | /connectors/{커넥터 이름}/tasks/{태스크 아이디}/restart | 실행 중인 커넥터의 태스크 재시작 요청 |
GET | /connectors/{커넥터 이름}/topics | 커넥트에 존재하는 커넥터 플러그인 확인 |
PUT | /connector-plugins/ | 커넥트에 존재하는 커넥터 플러그인 확인 |
PUT | /connector-plugins/{커넥터 플러그인 이름}/config/validate | 커넥터 생성 시 설정값 유효 여부 확인 |
connect-standalone.properties
파일을 수정![image-20210629003252734](/Users/kimdonggeun/Library/Application Support/typora-user-images/image-20210629003252734.png)
위 이미지는 docker http://wurstmeister.github.io/kafka-docker/ 에서 설정된 binary 파일내부의 config이다.
#### connect-standalone.properties
# 커넥트와 연동할 카프카 클러스터의 호스트와 포트 작성
bootstrap.servers=localhost:9092
# 데이터를 카프카에 저장할 때 혹은 카프카에서 데이터를 가져올 때 변환하는 데에 사용한다.
# 카프카 커넥트는 JsonConverter, StringConverter, ByteArrayConverter를 기본으로 제공
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 만약 스키마 형태로 제공 X -> enable option을 false로 설정
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#단일모드 커넥트는 로컬파일에 오프셋 정보를 저장.
# 오프셋 정보는 소스커넥터 또는 싱크 커넥터가 데이터 처리 시점을 저장하기 위해 사용됨.
offset.storage.file.filename=/tmp/connect.offsets
# 태스크가 처리 완료한 오프셋을 커밋하는 주기를 설정
offset.flush.interval.ms=10000
#플러그인 형태로 추가할 커넥터의 디렉토리 주소를 입력
plugin.path=
connect-file-source.properties
로 저장되어있다.#### connect-file-source.properties
# 커넥터의 이름
name=local-file-source
#사용할 커넥터의 클래스 이름을 지정
connector.class=FileStreamSource
#커넥터로 실행할 태스크 개수를 지정
tasks.max=1
#읽을 파일의 위치
file=test.txt
# 읽은 파일의 데이터를 저장할 토픽의 이름을 지정
topic=connect-test
connect-standalone.sh config/connect-standalone.properties \
config/connect-file-source.properties
connect-distributed.properties
를 보면 알 수 있다.##### connect-distributed.properties
# 카프카 브로커 서버
bootstrap.servers=localhost:9092
# 다수의 카프카 프로세스들을 묶을 그룹 이름
group.id=connect-cluster
# 데이터를 카프카에 저장할 때 혹은 카프카에서 데이터를 가져올 때 변환하는데 사용
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 스키마 형태를 사용하고 싶지 않다면 enable 옵션을 false로 변경한다.
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 분산 모드 커넥트는 카프카 내부 토픽에 오프셋 정보를 저장.
# 이 오프셋 정보는 소스 커넥터 또는 싱크 커넥터가 데이터 처리 시점을 저장하기 위해 사용함.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
# 오프셋을 커밋하는 주기
offset.flush.interval.ms=10000
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
# 추가 플러그인 경로!
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=
REST API
를 통해 실행/중단/변경할 수 있음.connect-distributed.sh ../config/connect-distributed.properties