[Apache Kafka] Kafka Connect를 통한 파이프라인 구축

youngtae·2023년 9월 11일
0

Apache Kafka

목록 보기
3/3
post-thumbnail

Kafka Connect

Kafka와 외부 데이터 소스나 저장소 간에 데이터를 효율적으로 이동시키기 위한 프레임워크
이를 통해 kafka 토픽에 저장되는 데이터를 실시간으로 감지해서 추출할 수 있다.

  • 플러그 가능한 아키텍처: Kafka Connect는 다양한 소스와 싱크를 위한 플러그인(커넥터)을 지원한다. 커넥터는 오픈소스로 제공되거나 커뮤니티나 개발자에 의해 만들어질 수 있다.
  • 분산 및 확장성: Kafka Connect는 분산 환경에서 실행될 수 있다. 이는 데이터 처리량이 늘어나거나 시스템이 복잡해져도 쉽게 확장 가능하다는 것을 의미
  • 오프셋 관리: 내부적으로 Kafka에서 오프셋을 관리하기 때문에, 장애가 발생했을 때도 데이터 무결성을 유지할 수 있다.
  • 설정 기반의 실행: 코드를 작성할 필요 없이 간단한 JSON 또는 YAML 파일로 Kafka Connect를 설정할 수 있다.
  • 스트림과 배치 모드 지원: Kafka Connect는 실시간 스트림 처리뿐만 아니라, 배치 작업도 지원

    결과적으로, Kafka Connect를 이용하면 코드를 작성하지 않고 설정만으로 다양한 외부 시스템과 Kafka 간의 데이터 이동을 처리할 수 있어 운영적인 편의성이 높다.

Kafka connect vs Kafka connector

  • connect는 외부 시스템과 kafka의 데이터 이동 작업을 처리하는 프레임워크
  • connector는 connect 프레임워크 안에서 실행되는 플러그인으로 로직을 담당, 커넥터는 소스 커넥터와 싱크 커넥터로 구분된다.

  • 소스 커넥터(Source Connector): 외부 시스템(예: RDBMS, 로그 파일, 외부 API 등)에서 데이터를 가져와 Kafka 토픽으로 전달
  • 싱크 커넥터(Sink Connector): Kafka 토픽에서 데이터를 가져와 외부 시스템(예: RDBMS, ElasticSearch, HDFS 등)으로 전달

Kafka Connect 설치 및 실행

  • ec2 인스턴스 환경에서 도커 컴포즈를 통해 kafka 클러스터를 만들어 놓은 뒤의 작업과정임을 유의
  • kafka 패키지의 일부이므로 따로 설치할 필요없이 설정 파일 수정을 통해 실행 가능
  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.5.1
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
      - "8083:8083"
    environment:
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/java/kafka-connect-elasticsearch"
      CONNECT_CONFIG_STORAGE_TOPIC: "my_connect_configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "my_connect_offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "my_connect_statuses"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29093,kafka-3:29094
      CONNECT_REST_PORT: 8083
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_GROUP_ID: compose-connect-group

docker-compose 파일에 위의 kafka connect 내용 추가 후 실행

elasticsearch와 연결

Kafka Connect Elasticsearch 플러그인 설치

Confluent Hub를 통해서 설치. Confluent Hub는 Kafka Connect에서 사용되는 다양한 플러그인을 간편하게 다운로드할 수 있는 저장소

# 폴더 만들고 이동
$ mkdir confluent-hub
$ cd confluent-hub
# 현재 경로에 다운로드하고 압축해제
$ curl -L -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
$ tar -zxvf confluent-hub-client-latest.tar.gz
# 하위 폴더 추가
$ mkdir plugins
# PATH 환경변수 추가
$ vi ~/.bashrc
export CONFLUENT_HOME='~/confluent-hub'
export PATH=$PATH:$CONFLUENT_HOME/bin
$ source ~/.bashrc
# Elasticsearch Connector 설치
$ cd ./confluent-hub/bin
$ confluent-hub install confluentinc/kafka-connect-elasticsearch:5.5.1 --no-prompt --component-dir /home/ubuntu/confluent-hub/plugins
  • --component-dir : 플러그인을 설치할 설치경로
# kafka-connect 부분에 아래 내용 추가
    volumes:
      - /home/ubuntu/confluent-hub/plugins/confluentinc-kafka-connect-elasticsearch:/usr/share/java/kafka-connect-elasticsearch

엘라스틱서치 커넥터 등록

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "coupang",
        "key.ignore": "true",
        "schema.ignore": "true",
        "connection.url": "http://{ip주소}:9200",
        "type.name": "kafka-connect",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false"
    }
}'
profile
나의 개발기록

0개의 댓글