kafka connect를 이용한 데이터 저장

greenTea·2024년 11월 23일

kafka connect

카프카 커넥트는 아파치 카프카와 다른 데이터 시스템 간의 데이터를 확장 가능하고 안정적으로 스트리밍하기 위한 도구이다

Confluent Documentation에서 설명하는 글의 일부분을 가져와봤습니다. 위 정의를 좀 더 쉽게 설명하자면 A -> B 로 데이터를 이동 시켜주는 도구라고 생각하면 됩니다.
이를 이용한다면 사용자가 보낸 데이터를 수작업으로 가공 처리해서 보내는 것이 아니라 자동화해서 원하는 곳으로 이동/적재 할 수 있어 매우 유용합니다.

그럼 간단하게 어떻게 사용하는지 알아보겠습니다.

Docker-Compose 설정

아래는 Zookeeper, Kafka, Kafka Connect, Kafka UI 서비스를 포함한 Docker Compose 설정입니다.
여기서 핵심은 Kafka Connect입니다.

version: '3'
services:
  zookeeper:
    image: zookeeper
    ports:
      - "2181:2181"
  
  kafka:
    image: bitnami/kafka:3.4.0
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=LC://kafka:29092,LX://kafka:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=LC://kafka:29092,LX://${DOCKER_HOST_IP:-localhost}:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=LC:PLAINTEXT,LX:PLAINTEXT,CONTROLLER:PLAINTEXT
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=LC
      - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
    depends_on:
      - zookeeper
  
  kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    depends_on:
      - kafka
      - mariadb
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "quickstart-avro"
      CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    volumes:
      - ./jars:/etc/kafka-connect/jars

  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8989:8080"
    depends_on:
      - kafka
      - zookeeper
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181

Note:
kafka connect를 사용하기 위해서는 이를 위한 connector가 필요합니다. 직접 만들어서 사용할 수 도 있으나 MySQL Connector, S3 Connector 등 다양한 Connector가 이미 존재하고 있기에 이를 활용하는 것이 더 좋습니다. 여기서는 MySQL Connector를 사용하였습니다.

jars 폴더에 필요한 JAR 파일 (예: MySQL Connector, JDK JAR 등)을 넣으면 됩니다.

Kafka 설정

Kafka Producer 설정을 위한 Spring Configuration 클래스입니다.

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

기본적인 연결과 직렬화 옵션을 설정하였습니다.

Kafka 데이터 전송 예제

Kafka Producer를 사용하여 정해진 구조에 맞는 데이터를 전송하는 코드입니다.

@RequiredArgsConstructor
@Slf4j
@Service
public class OrderProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = Arrays.asList(
        new Field("string", true, "order_id"),
        new Field("string", true, "user_id"),
        new Field("string", true, "product_id"),
        new Field("int32", true, "qty"),
        new Field("int32", true, "unit_price"),
        new Field("int32", true, "total_price")
    );
    Schema schema = Schema.builder()
                          .type("struct")
                          .fields(fields)
                          .optional(false)
                          .name("orders")
                          .build();

    public OrderDto send(String topic, OrderDto orderDto) {
        Payload payload = Payload.builder()
                                 .order_id(orderDto.getOrderId())
                                 .user_id(orderDto.getUserId())
                                 .product_id(orderDto.getProductId())
                                 .qty(orderDto.getQty())
                                 .unit_price(orderDto.getUnitPrice())
                                 .total_price(orderDto.getTotalPrice())
                                 .build();
        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
        ObjectMapper mapper = new ObjectMapper();
        String jsonString = "";
        try {
            jsonString = mapper.writeValueAsString(kafkaOrderDto);
        } catch (Exception e) {
            e.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonString);
        return orderDto;
    }
}

json처럼 kafka에 데이터를 보낼때에도 각 Connector에 맞는 데이터 구조를 이용해서 보내야 합니다. 저는 Mysql을 사용하고 있기에 이에 맞게 정의하여 보냈습니다.

Kafka Connect 등록

Kafka Connect를 이용하여 Kafka Topic 데이터를 MySQL로 저장합니다. 아래는 Kafka Connect 설정 예제입니다. 이를 통해 mysql의 정보를 넘겨주면서 Server -> kafka -> Mysql의 순서로 데이터가 이동할 수 있게 하였습니다.(각 옵션의 경우 기본적인 연결과정만을 거쳐준 상황이라고 생각하시면 될 것 같습니다.)

POST /connectors HTTP/1.1
Host: 127.0.0.1:8083
Content-Type: application/json

{
  "name": "mysql_sink_connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "my_topic",
    "connection.url": "jdbc:mysql://localhost:3306/mydatabase",
    "connection.user": "myuser",
    "connection.password": "mypassword",
    "auto.create": "true",
    "auto.evolve": "true",
    "delete.enabled": "false"
  }
}

Kafka Connect 목록 확인

Kafka Connect에 등록된 모든 커넥터 목록을 확인합니다.

GET /connectors HTTP/1.1
Host: 127.0.0.1:8083

결과

위 설정을 통해 Kafka Producer로 데이터를 전송하면 Kafka Topic에 데이터가 저장되고, Kafka Connect를 통해 해당 데이터를 MySQL 데이터베이스에 저장할 수 있습니다. 이 과정을 통해 실시간 데이터 파이프라인을 구축할 수 있습니다.

profile
greenTea입니다.

0개의 댓글