Apache Kafka Connect 활용

이원석·2023년 12월 8일
0

ApacheKafka

목록 보기
3/3
post-thumbnail
본 내용은 인프런의 이도원님의 SpringCloud 강의를 참고하여 작성되었습니다.



많은 양의 트래픽이 서버에 전달되고 DB에 저장하는 과정에서 문제 (데드락 등) 가 생긴다면 서비스에 문제가 생길수 있다.

이는 서버와 DB가 동기적으로 작동하며 높은 결합도를 가지고 있기 때문인데, 데이터를 저장하는 방식을 비동기적으로 처리할 수 있는 메시지 큐 서비스(Kafka) 를 사용한다면 이러한 문제를 어느정도 방지할 수 있다! 서버는 저장되는 데이터를 메시지 브로커(Kafka) 에 던지기만 하고 다음 작업을 처리하면 된다.

보통, Connect Sink 는 데이터를 저장하는 과정에서 많이 사용되며, 삭제나 변경은 복잡한 트랜잭션이나 상황이 요구될 수 있기 때문에 잘 사용하지 않는다고 한다.



1. Kafka Connect Sink

https://velog.io/@wonseok97/Apache-Kafka-Connect

이전 포스팅에서 Kafka Connect Source, Sink의 역활에 대해 알아보았다.

이번 포스팅에서는 진행중인 프로젝트 리팩토링에 Kafka Sink를 활용하고자 한다.

[Flow]

1. Comment Service :
서비스에서 데이터를 생성 및 직렬화 하고 Kafka Producer 를 사용하여 Kafka Cluster 의 특정 Topic 으로 데이터를 전송합니다.


2. Kafka Cluster :
Kafka Clustertopic 에 도착한 데이터를 파티션으로 분배하고 안전하게 저장합니다.


3. Kafka Connect Sink:
Kafka Connect Sink 는 특정 topic 을 구독하여 Cluster 로 부터 데이터를 가져와 외부 시스템에 저장합니다. 이때 쿼리 정보를 사용하여 데이터를 가져옵니다.



Zookeeper - Kafka Server - Kafka Connect 순으로 서버를 기동시켜 주자.

[Zookeeper]
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties

[Kafka Server]
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

[Kafka Connect]
$KAFKA_HOME/bin/connect-distributed $KAFKA_HOME/etc/kafka/connect-distributed.properties



Cluster의 Topic에 등록된 데이터를 가져오기 위해서는 Kafka Connect Sink Connector에 대한 설정 정보가 필요하다.

{
    "name": "my-comment-sink-connect",

    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

        // 데이터베이스 연결 정보
        "connection.url": "jdbc:mysql://localhost:3306/userdb",
        "connection.user": "root",
        "connection.password": "1234",

        // 새로운 테이블 자동 생성
        "auto.create": "true",

        // 테이블 스키마 자동 업데이트
        "auto.evolve": "true",

        // 레코드 삭제 비활성화
        "delete.enabled": "false",

        // 작업 수 최대 1개로 설정
        "tasks.max": "1",

        // 구독할 토픽 이름
        "topics": "comment"
    }
}
  • connector.class: 사용할 커넥터 클래스를 지정.

  • auto.create: true로 설정, 커넥터가 지정한 토픽에 대응하는 데이터베이스 테이블을 자동으로 생성.

  • auto.evolve: true로 설정, 데이터베이스의 스키마가 변경되면 자동으로 테이블의 스키마를 업데이트합니다.

  • delete.enabled: false로 설정, 레코드 삭제가 비활성화.

  • tasks.max: 작업 수를 최대 1개로 설정.

  • topics: 구독할 Kafka 토픽의 이름을 지정.

http://localhost:[Kafka Connect Port]/connectors 에 POST 해서 해당 설정 정보를 등록해주자.

이제, my-comment-sink-connect 라는 이름을 가진 Kafka Connect Sink Connector는 Comment 라는 이름의 Topic 을 구독하여 메시지가 추가될 경우 Cluster 를 통해 가져오게 된다.



2. Kafka Producer

Apache Kafka에 데이터를 생성하고 보내기 위해서는 Kafka Producer를 정의하고 Bean으로 등록하는 과정이 필요하다.


@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

ProducerFactory 메서드에서는 HashMap 타입을 통해 Kafka Producer에 필요한 설정들을 지정 한다.

  • BOOTSTRAP_SERVERS_CONFIG: Kafka Broker Address
  • KEY_SERIALIZER_CLASS_CONFIG: 메시지 키 값의 직렬화 하겠다!
  • VALUE_SERIALIZER_CLASS_CONFIG: 메시지의 값을 직렬화 하겠다!



3. 직렬화를 위한 Schema 형태

[Avro Schema]

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": true,
        "field": "content"
      },
      {
        "type": "long",
        "optional": true,
        "field": "meetingId"
      },
      {
        "type": "string",
        "optional": true,
        "field": "userCode"
      }
    ],
    "optional": false,
    "name": "comment"
  },
  "payload": {
    "content": "testContent",
    "meetingId": 1,
    "userCode": "testcode"
  }
}

해당 JSON은 Apache Kafka의 Schema Registry에서 사용되는 Avro Schema와 메시지의 조합이다. Avro는 데이터 직렬화 및 저장을 위한 프레임워크로서, 스키마에 기반하여 이진 형식으로 데이터를 효율적으로 직렬화하고 역직렬화 한다.


  • schema 오브젝트:
"schema": {
    "type": "struct",
    "fields": [
     {
        "type": "string",
        "optional": true,
        "field": "content"
      },
      {
        "type": "long",
        "optional": true,
        "field": "meetingId"
      },
      {
        "type": "string",
        "optional": true,
        "field": "userCode"
      }
    ],
    "optional": false,
    "name": "comment"
  }

type: 스키마의 종류로, 여기서는 "struct"로 지정되어 구조체 형태의 데이터를 다루는 스키마이다.
fields: 스키마의 필드 목록을 포함합니다. 각 필드는 type, optional, field와 같은 속성들을 가지고 있다.
optional: 필드가 선택적인지 여부

  • payload 오브젝트:
  "payload": {
    "content": "testContent",
    "meeting_id": 1,
    "user_code": "testCode"
  }

payload 오브젝트는 실제 메시지 데이터가 들어 있는 부분이다.

해당 형식의 Schema 데이터를 JSON 오브젝트 형태로 직렬화 하는 과정이 필요하다! 이 작업을 바로 CommentProducer에서 처리할 예정이며, 구조체의 형태는 클래스를 활용하자.



4. CommentProducer

@Service
@Slf4j
@RequiredArgsConstructor
public class CommentProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = Arrays.asList(
            new Field("string", true, "content"),
            new Field("int64", true, "meeting_id"),
            new Field("string", true, "user_code")
    );

    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("comment")
            .build();


    public void send(String topic, CommentResponseDto commentDto) {

        Payload payload = Payload.builder()
                .content(commentDto.getContent())
                .meeting_id(commentDto.getMeetingId())
                .user_code(commentDto.getUserCode())
                .build();

        KafkaCommentDto kafkaCommentDto = new KafkaCommentDto(schema, payload);

        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";

        try {
            jsonInString = mapper.writeValueAsString(kafkaCommentDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
    }
}

@Data
@AllArgsConstructor
public class Field {
    private String type;
    private boolean optional;
    private String field;
}

@Data
@AllArgsConstructor
public class KafkaCommentDto implements Serializable {
    private Schema schema;
    private Payload payload;
}

@Data
@Builder
public class Payload {
    private String content;
    private Long meeting_id;
    private String user_code;
}

@Data
@Builder
public class Schema {
    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;
}

KafkaCommentDto 에 최종적으로 직렬화 할 Schema와 Payload를 담아야 한다.

데이터를 저장할 테이블의 형식에 맞춰 field를 생성해주고, 이를 Schema에 저장해 준다.
전달할 데이터를 (commentDto) 가져와 Payload에 저장해 주자.

kafkaTempalte을 통해 Topic의 이름과 이를 JSON 형태의 문자열로 직렬화 한 jsonInString 값을 전달해 주면 끝!





참고문헌

0개의 댓글

관련 채용 정보