🤬 DB 부하 어떻게 줄일까?

Polling-Publisher 모델은 지속적으로 데이터베이스에 조회 쿼리를 실행해서, 처리되지 않은 메시지를 가져옵니다.

그로 인해 데이터베이스에 많은 부하가 발생하며, 부가적으로 메시지 처리Update 쿼리를 실행시키는 등의 작업으로 추가적인 부하도 일어납니다.

DB의 부하를 줄이고, 데이터의 생성 뿐만 아니라 업데이트, 삭제 등의 이벤트도 감지할 수 있는 CDC를 활용하여 마지막 개선을 해보겠습니다.

😇 그래서 CDC가 뭔데?

CDC(Change Data Capture)는 데이터가 변경되는 시점변경 항목을 추적하고 이러한 변경이 전파되어야 하는 다른 시스템이나 서비스에 알림을 전송하는 패턴입니다.

CDC 솔루션도 여러가지 제품이 있지만 그 중에서 Debezium MySQL conenctor를 사용하도록 하겠습니다.

이 그림을 보면 주문 서비스에서 주문이 발생하고 데이터의 변경(저장)이 일어나서 Debezium Event Router 를 통해 Kafka로 메시지가 발행되고 토픽구독하는 배송, 회원 서비스가 메시지를 수신합니다.

이제 직접 Debezium MySQL Connector를 설정해보겠습니다.

😗 Debezium MySQL Connector 설정

저는 MySQL, Kafka, Zookeeper, Spring Service, Debezium Connectordocker-compose로 구성했습니다.

version: '3.7'

services:
  mysql1:
    image: mysql:8.0
    platform: linux/x86_64
    container_name: mysql1
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: post_service_db
    volumes:
      - mysql1_data:/var/lib/mysql
      - ./mysql/my1.cnf:/etc/mysql/my.cnf
    ports:
      - "3306:3306"
    networks:
      my-network:
        ipv4_address: 172.20.0.2

  mysql2:
    image: mysql:8.0
    platform: linux/x86_64
    container_name: mysql2
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: subscribe_service_db
    volumes:
      - mysql2_data:/var/lib/mysql
      - ./mysql/my2.cnf:/etc/mysql/my.cnf
    ports:
      - "3307:3306"
    networks:
      my-network:
        ipv4_address: 172.20.0.3

  zookeeper:
    image: debezium/zookeeper:2.7
    container_name: zookeeper
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"

  kafka:
    image: debezium/kafka:2.7
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181

  connect:
    image: debezium/connect:2.7
    container_name: connect
    depends_on:
      - kafka
      - mysql1
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses

  subscribe-service:
    build: ../v3/subscribe-service
    container_name: subscribe-service
    ports:
      - "8082:8082"
    depends_on:
      - mysql2
      - kafka
    environment:
      SPRING_DATASOURCE_URL: jdbc:mysql://mysql2:3306/subscribe_service_db?useSSL=false&serverTimezone=UTC
      SPRING_DATASOURCE_USERNAME: root
      SPRING_DATASOURCE_PASSWORD: root
      SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    restart: always

  post-service:
    build: ../v3/post-service
    container_name: post-service
    ports:
      - "8081:8081"
    depends_on:
      - mysql1
    environment:
      SPRING_DATASOURCE_URL: jdbc:mysql://mysql1:3306/post_service_db?useSSL=false&serverTimezone=UTC
      SPRING_DATASOURCE_USERNAME: root
      SPRING_DATASOURCE_PASSWORD: root
    restart: always

volumes:
  mysql1_data:
  mysql2_data:

MySQL에서 사용되는 my.cnf 파일은 아래와 같습니다.

[client]
default-character-set = utf8mb4

[mysql]
default-character-set = utf8mb4

# MySQL 서버(daemon)의 설정을 지정한다.
[mysqld]
character-set-client-handshake = FALSE
character-set-server           = utf8mb4
collation-server               = utf8mb4_unicode_ci
default-time-zone='+9:00'

# 이진 로그를 사용해서 MySQL 서버에서 발행하는 변경 사항을 기록한다. mysql-bin은 이진로그의 파일 이름을 지정한다.
log-bin = mysql-bin

# MySQL 사용자의 인증 플러그인을 mysql_native_password로 지정한다 -> 5.7 이전 버전과의 호환성을 유지하기 위해 사용되는 매커니즘이다.
default_authentication_plugin=mysql_native_password

여기서 가장 중요한 부분은 이진 로그를 사용하는 설정을 하는 구간입니다.
Debezium MySQL Connector는 이진 로그를 활용하여 데이터의 변경을 추적하기 때문입니다.

이제 순서대로 Debezium Connector를 설정해보겠습니다.

  1. Debezium Connector에 새로운 Connector 설정하기
{
  "name": "outbox-connector",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "mysql1",  
    "database.port": "3306",
    "database.user": "root",
    "database.password": "root",
    "database.server.id": "184054",  
    "topic.prefix": "post-create-event",  
    "table.include.list": "post_service_db.post_event_outbox",  
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",  
    "schema.history.internal.kafka.topic": "post-create-event"  
  }
}

위의 JSONBody에 담아서 POST http://localhost:8083/connectors 요청을 보냅니다.

각 필드에 대해서 간단히 설명을 드리면 아래와 같습니다.

  • database.hostname : DB 접속 IP (같은 네트워크를 사용하므로 컨테이너 이름을 지정)
  • database.server.id : 데이터베이스 클라이언트의 숫자 ID로, MySQL 클러스터에서 현재 실행 중인 모든 데이터베이스 프로세스에서 고유해야 합니다.
  • topic.prefix : 모든 Kafka 토픽의 접두사로 사용됩니다.
  • table.include.list : database.tableName 형식으로 변경 사항을 캡쳐하려는 테이블을 지정합니다.

요청을 보내면 아래와 같은 응답이 오고, Connector가 생성됩니다.

더 자세한 내용은 Debezium Tutorial 공식 문서를 확인해주세요:)

이러면 설정이 잘 됬는지 궁금해지실겁니다.
제가 확인한 방법은 Kafka 토픽을 조회해서 Debezium Connector가 토픽을 생성했는지 확인하는 방식입니다.

$ kafka-topics.sh --bootstrap-server=kafka:9092 --list

이 명령어로 Kafka에 생성된 토픽을 조회하면 커넥터가 연결되었다는 것을 알 수 있습니다.

그렇다면 post-create-event에는 어떤 메시지가 있는지 보겠습니다.

./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic post-create-event --from-beginning

명령어로 토픽에 생성된 메시지를 볼 수 있습니다.

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "name":"io.debezium.data.Enum",
                  "version":1,
                  "parameters":{
                     "allowed":"true,last,false,incremental"
                  },
                  "default":"false",
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"sequence"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"ts_us"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"ts_ns"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"table"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"server_id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"gtid"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"file"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"pos"
               },
               {
                  "type":"int32",
                  "optional":false,
                  "field":"row"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"thread"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"query"
               }
            ],
            "optional":false,
            "name":"io.debezium.connector.mysql.Source",
            "field":"source"
         },
         {
            "type":"int64",
            "optional":false,
            "field":"ts_ms"
         },
         {
            "type":"string",
            "optional":true,
            "field":"databaseName"
         },
         {
            "type":"string",
            "optional":true,
            "field":"schemaName"
         },
         {
            "type":"string",
            "optional":true,
            "field":"ddl"
         },
         {
            "type":"array",
            "items":{
               "type":"struct",
               "fields":[
                  {
                     "type":"string",
                     "optional":false,
                     "field":"type"
                  },
                  {
                     "type":"string",
                     "optional":false,
                     "field":"id"
                  },
                  {
                     "type":"struct",
                     "fields":[
                        {
                           "type":"string",
                           "optional":true,
                           "field":"defaultCharsetName"
                        },
                        {
                           "type":"array",
                           "items":{
                              "type":"string",
                              "optional":false
                           },
                           "optional":true,
                           "field":"primaryKeyColumnNames"
                        },
                        {
                           "type":"array",
                           "items":{
                              "type":"struct",
                              "fields":[
                                 {
                                    "type":"string",
                                    "optional":false,
                                    "field":"name"
                                 },
                                 {
                                    "type":"int32",
                                    "optional":false,
                                    "field":"jdbcType"
                                 },
                                 {
                                    "type":"int32",
                                    "optional":true,
                                    "field":"nativeType"
                                 },
                                 {
                                    "type":"string",
                                    "optional":false,
                                    "field":"typeName"
                                 },
                                 {
                                    "type":"string",
                                    "optional":true,
                                    "field":"typeExpression"
                                 },
                                 {
                                    "type":"string",
                                    "optional":true,
                                    "field":"charsetName"
                                 },
                                 {
                                    "type":"int32",
                                    "optional":true,
                                    "field":"length"
                                 },
                                 {
                                    "type":"int32",
                                    "optional":true,
                                    "field":"scale"
                                 },
                                 {
                                    "type":"int32",
                                    "optional":false,
                                    "field":"position"
                                 },
                                 {
                                    "type":"boolean",
                                    "optional":true,
                                    "field":"optional"
                                 },
                                 {
                                    "type":"boolean",
                                    "optional":true,
                                    "field":"autoIncremented"
                                 },
                                 {
                                    "type":"boolean",
                                    "optional":true,
                                    "field":"generated"
                                 },
                                 {
                                    "type":"string",
                                    "optional":true,
                                    "field":"comment"
                                 },
                                 {
                                    "type":"string",
                                    "optional":true,
                                    "field":"defaultValueExpression"
                                 },
                                 {
                                    "type":"array",
                                    "items":{
                                       "type":"string",
                                       "optional":false
                                    },
                                    "optional":true,
                                    "field":"enumValues"
                                 }
                              ],
                              "optional":false,
                              "name":"io.debezium.connector.schema.Column",
                              "version":1
                           },
                           "optional":false,
                           "field":"columns"
                        },
                        {
                           "type":"string",
                           "optional":true,
                           "field":"comment"
                        }
                     ],
                     "optional":true,
                     "name":"io.debezium.connector.schema.Table",
                     "version":1,
                     "field":"table"
                  }
               ],
               "optional":false,
               "name":"io.debezium.connector.schema.Change",
               "version":1
            },
            "optional":false,
            "field":"tableChanges"
         }
      ],
      "optional":false,
      "name":"io.debezium.connector.mysql.SchemaChangeValue",
      "version":1
   },
   "payload":{
      "source":{
         "version":"2.7.0.Final",
         "connector":"mysql",
         "name":"post-create-event",
         "ts_ms":1719795356101,
         "snapshot":"true",
         "db":"post_service_db",
         "sequence":null,
         "ts_us":1719795356101473,
         "ts_ns":1719795356101473000,
         "table":"post_event_outbox",
         "server_id":0,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":3205,
         "row":0,
         "thread":null,
         "query":null
      },
      "ts_ms":1719795356103,
      "databaseName":"post_service_db",
      "schemaName":null,
      "ddl":"CREATE TABLE `post_event_outbox` (\n  `author_id` bigint NOT NULL,\n  `created_at` timestamp NOT NULL,\n  `post_id` bigint NOT NULL,\n  `updated_at` timestamp NOT NULL,\n  `event_id` binary(16) NOT NULL,\n  `title` text COLLATE utf8mb4_unicode_ci NOT NULL,\n  PRIMARY KEY (`event_id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci",
      "tableChanges":[
         {
            "type":"CREATE",
            "id":"\"post_service_db\".\"post_event_outbox\"",
            "table":{
               "defaultCharsetName":"utf8mb4",
               "primaryKeyColumnNames":[
                  "event_id"
               ],
               "columns":[
                  {
                     "name":"author_id",
                     "jdbcType":-5,
                     "nativeType":null,
                     "typeName":"BIGINT",
                     "typeExpression":"BIGINT",
                     "charsetName":null,
                     "length":null,
                     "scale":null,
                     "position":1,
                     "optional":false,
                     "autoIncremented":false,
                     "generated":false,
                     "comment":null,
                     "defaultValueExpression":null,
                     "enumValues":null
                  },
                  {
                     "name":"created_at",
                     "jdbcType":2014,
                     "nativeType":null,
                     "typeName":"TIMESTAMP",
                     "typeExpression":"TIMESTAMP",
                     "charsetName":null,
                     "length":null,
                     "scale":null,
                     "position":2,
                     "optional":false,
                     "autoIncremented":false,
                     "generated":false,
                     "comment":null,
                     "defaultValueExpression":null,
                     "enumValues":null
                  },
                  {
                     "name":"post_id",
                     "jdbcType":-5,
                     "nativeType":null,
                     "typeName":"BIGINT",
                     "typeExpression":"BIGINT",
                     "charsetName":null,
                     "length":null,
                     "scale":null,
                     "position":3,
                     "optional":false,
                     "autoIncremented":false,
                     "generated":false,
                     "comment":null,
                     "defaultValueExpression":null,
                     "enumValues":null
                  },
                  {
                     "name":"updated_at",
                     "jdbcType":2014,
                     "nativeType":null,
                     "typeName":"TIMESTAMP",
                     "typeExpression":"TIMESTAMP",
                     "charsetName":null,
                     "length":null,
                     "scale":null,
                     "position":4,
                     "optional":false,
                     "autoIncremented":false,
                     "generated":false,
                     "comment":null,
                     "defaultValueExpression":null,
                     "enumValues":null
                  },
                  {
                     "name":"event_id",
                     "jdbcType":-2,
                     "nativeType":null,
                     "typeName":"BINARY",
                     "typeExpression":"BINARY",
                     "charsetName":null,
                     "length":16,
                     "scale":null,
                     "position":5,
                     "optional":false,
                     "autoIncremented":false,
                     "generated":false,
                     "comment":null,
                     "defaultValueExpression":null,
                     "enumValues":null
                  },
                  {
                     "name":"title",
                     "jdbcType":12,
                     "nativeType":null,
                     "typeName":"TEXT",
                     "typeExpression":"TEXT",
                     "charsetName":"utf8mb4",
                     "length":null,
                     "scale":null,
                     "position":6,
                     "optional":false,
                     "autoIncremented":false,
                     "generated":false,
                     "comment":null,
                     "defaultValueExpression":null,
                     "enumValues":null
                  }
               ],
               "comment":null
            }
         }
      ]
   }
}

이 메시지는 여러가지 메타데이터 설정 및 post_event_outbox 테이블의 DDL이 실행된 히스토리 정보임을 알 수 있습니다.

그렇다면 실제 데이터가 변경 되었을때 기록은 어떤 토픽에 있을까요?
게시글 작성 요청을 한 번 보내보겠습니다.

요청을 보낸 뒤, 토픽 리스트를 확인해보니 새로운 토픽이 생겼습니다.

토픽메시지를 확인해보면 아래와 같습니다.

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"struct",
            "fields":[
               {
                  "type":"int64",
                  "optional":false,
                  "field":"author_id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.time.ZonedTimestamp",
                  "version":1,
                  "field":"created_at"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"post_id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.time.ZonedTimestamp",
                  "version":1,
                  "field":"updated_at"
               },
               {
                  "type":"bytes",
                  "optional":false,
                  "field":"event_id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"title"
               }
            ],
            "optional":true,
            "name":"post-create-event.post_service_db.post_event_outbox.Value",
            "field":"before"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"int64",
                  "optional":false,
                  "field":"author_id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.time.ZonedTimestamp",
                  "version":1,
                  "field":"created_at"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"post_id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.time.ZonedTimestamp",
                  "version":1,
                  "field":"updated_at"
               },
               {
                  "type":"bytes",
                  "optional":false,
                  "field":"event_id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"title"
               }
            ],
            "optional":true,
            "name":"post-create-event.post_service_db.post_event_outbox.Value",
            "field":"after"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "name":"io.debezium.data.Enum",
                  "version":1,
                  "parameters":{
                     "allowed":"true,last,false,incremental"
                  },
                  "default":"false",
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"sequence"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"ts_us"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"ts_ns"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"table"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"server_id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"gtid"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"file"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"pos"
               },
               {
                  "type":"int32",
                  "optional":false,
                  "field":"row"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"thread"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"query"
               }
            ],
            "optional":false,
            "name":"io.debezium.connector.mysql.Source",
            "field":"source"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"total_order"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"data_collection_order"
               }
            ],
            "optional":true,
            "name":"event.block",
            "version":1,
            "field":"transaction"
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_us"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ns"
         }
      ],
      "optional":false,
      "name":"post-create-event.post_service_db.post_event_outbox.Envelope",
      "version":2
   },
   "payload":{
      "before":null,
      "after":{
         "author_id":1,
         "created_at":"2024-06-30T19:40:00Z",
         "post_id":8,
         "updated_at":"2024-06-30T19:40:00Z",
         "event_id":"C2bcliDMQGWvIjM7UmDfnQ==",
         "title":"Rex Test - 1"
      },
      "source":{
         "version":"2.7.0.Final",
         "connector":"mysql",
         "name":"post-create-event",
         "ts_ms":1719808800000,
         "snapshot":"false",
         "db":"post_service_db",
         "sequence":null,
         "ts_us":1719808800000000,
         "ts_ns":1719808800000000000,
         "table":"post_event_outbox",
         "server_id":1,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":5958,
         "row":0,
         "thread":103,
         "query":null
      },
      "transaction":null,
      "op":"c",
      "ts_ms":1719808800201,
      "ts_us":1719808800201982,
      "ts_ns":1719808800201982000
   }
}

payload.after 를 보면 데이터가 어떻게 저장되어 있는지 알 수 있습니다.
여기서 알 수 있는 점은 데이터 변경시 메시지가 쌓이는 토픽은 {topic.prefix}.{database}.{table} 규칙으로 생성된다는 것입니다.

이제 V2의 코드를 약간 수정하여 적용시켜 보겠습니다.

😎 V2 -> V3 마지막 개선

Post-Service의 변경 포인트는 스케줄러 삭제Kafka 의존성 제거입니다.

Subscribe-ServiceKafka Listener에 변경이 있어, 이 부분을 수정해보겠습니다.

@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {

    private final KafkaConsumerProperties kafkaConsumerProperties;
    private final CreatePostEventDeserializer createPostEventDeserializer;

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, CreatePostEvent> concurrentKafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, CreatePostEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    ConsumerFactory<String, CreatePostEvent> consumerFactory() {

        Map<String, Object> consumerConfigs = Map.of(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrapServers(),
                ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroupId(),
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CreatePostEventDeserializer.class,
                JsonDeserializer.TRUSTED_PACKAGES, "*",
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"
        );

        return new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(), new ErrorHandlingDeserializer<>(createPostEventDeserializer));
    }

}

Consumer Factory를 직렬화와 역직렬화를 모두 문자열 기반으로 하도록 수정했습니다.
schema.payload.after 데이터를 사용해야 하는데 하나의 비즈니스 처리를 위해 세 개의 객체를 중첩해서 쌓는 것은 좋지 않다고 판단했습니다.

따라서 CustomDeserializer를 작성하여 Config Class에 넣어주었습니다.

@Slf4j
@Component
@RequiredArgsConstructor
public class CreatePostEventDeserializer implements Deserializer<CreatePostEvent> {

    private final ObjectMapper objectMapper;

    @Override
    public CreatePostEvent deserialize(String topic, byte[] data) {
        try {

            log.info("이벤트 수신");

            JsonNode extractedData = objectMapper.readTree(data)
                    .get("payload")
                    .get("after");


            log.error("새로운 이벤트 = [{}]", extractedData.toPrettyString());

            return objectMapper.treeToValue(extractedData, CreatePostEvent.class);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
}

KafkaListener는 기존과 거의 동일합니다.

@Slf4j
@Component
@RequiredArgsConstructor
public class CreatePostListener {

    private final SendNotificationUseCase sendNotificationUseCase;

    @KafkaListener(topics = "post-create-event.post_service_db.post_event_outbox", groupId = "post-group", containerFactory = "concurrentKafkaListenerContainerFactory")
    void consumeEvent(CreatePostEvent event) {
        log.info("Create Post Event 수신 Request Data = [{}], 수신 날짜 = [{}]", event, LocalDateTime.now());

        final SendNotificationServiceRequest serviceRequest = SendNotificationServiceRequest.of(event.eventId(), event.postId(),event.authorId(), event.title());
        sendNotificationUseCase.sendNotification(serviceRequest);
    }
}

메시지 데이터의 필드와 CreatePostEvent 객체의 필드는 표기법의 차이가 있습니다.
따라서, @JsonProperty 애너테이션을 이용해서 맞춰주겠습니다.

public record CreatePostEvent(
        @JsonProperty("event_id")
        UUID eventId,
        @JsonProperty("post_id")
        Long postId,
        @JsonProperty("author_id")
        Long authorId,
        String title) {
}

이제 테스트를 통해 정상 동작하는지 확인해보겠습니다.

😇 정상 케이스 테스트

테스트 방식은 V2와 같이 Jmeter1초에 10번 게시글 작성 요청을 보냅니다.

메시지가 구독자들에게 정상적으로 도착한 것을 확인할 수 있습니다.

🧐 비정상 케이스 테스트

이번에는 ZookeeperKafka, Debezium Connector를 모두 끄고 테스트 해보겠습니다.

  • Jmeter1초에 10번 게시글 작성 요청을 보냅니다.

  • Zookeeperkafka, Debezium Connector를 다시 켜줍니다.

  • 메시지정상 수신되는 것을 확인할 수 있습니다.

깨알 Trouble Shooting!
컨테이너를 종료하고 다시 실행하는 과정에서, 도커 캐시로 인하여 이전 IP 주소로 연결을 시도해 컨테이너끼리 연결이 안되는 문제가 발생했습니다.

저는 docker network를 명시적으로 생성하고, IP 주소고정 할당하여 문제를 해결했습니다.

docker network create --driver bridge --subnet 172.20.0.0/16 service-network
version: '3.7'

services:
  mysql1:
    image: mysql:8.0
    platform: linux/x86_64
    container_name: mysql1
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: post_service_db
    volumes:
      - mysql1_data:/var/lib/mysql
      - ./mysql/my1.cnf:/etc/mysql/my.cnf
    ports:
      - "3306:3306"
    networks:
      my-network:
        ipv4_address: 172.20.0.2

  mysql2:
    image: mysql:8.0
    platform: linux/x86_64
    container_name: mysql2
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: subscribe_service_db
    volumes:
      - mysql2_data:/var/lib/mysql
      - ./mysql/my2.cnf:/etc/mysql/my.cnf
    ports:
      - "3307:3306"
    networks:
      my-network:
        ipv4_address: 172.20.0.3

  zookeeper:
    image: debezium/zookeeper:2.7
    container_name: zookeeper
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"
    networks:
      my-network:
        ipv4_address: 172.20.0.6

  kafka:
    image: debezium/kafka:2.7
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
    networks:
      my-network:
        ipv4_address: 172.20.0.7

  connect:
    image: debezium/connect:2.7
    container_name: connect
    depends_on:
      - kafka
      - mysql1
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    networks:
      my-network:
        ipv4_address: 172.20.0.8

  subscribe-service:
    build: ../v3/subscribe-service
    container_name: subscribe-service
    ports:
      - "8082:8082"
    depends_on:
      - mysql2
      - kafka
    environment:
      SPRING_DATASOURCE_URL: jdbc:mysql://mysql2:3306/subscribe_service_db?useSSL=false&serverTimezone=UTC
      SPRING_DATASOURCE_USERNAME: root
      SPRING_DATASOURCE_PASSWORD: root
      SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      my-network:
        ipv4_address: 172.20.0.9
    restart: always

  post-service:
    build: ../v3/post-service
    container_name: post-service
    ports:
      - "8081:8081"
    depends_on:
      - mysql1
    environment:
      SPRING_DATASOURCE_URL: jdbc:mysql://mysql1:3306/post_service_db?useSSL=false&serverTimezone=UTC
      SPRING_DATASOURCE_USERNAME: root
      SPRING_DATASOURCE_PASSWORD: root
    networks:
      my-network:
        ipv4_address: 172.20.0.10
    restart: always

networks:
  my-network:
    external: true
    name: service-network

volumes:
  mysql1_data:
  mysql2_data:

🙇🏻 다음으로,,

총 3편의 포스팅을 통해 Transactional Outbox Pattern을 간단히 구현해보고 학습했습니다.

특히 CDC 같은 경우에는 이진 로그 기반으로 데이터의 변경을 추적하여 이벤트의 순서가 뒤바뀌거나 Polling 처럼 데이터베이스 부하를 일으킬 염려도 적습니다.

Debezium은 자체적으로 Exactly Once 옵션을 지원하지만 데이터 정합성 문제는 매우 중요하므로 혹시 모를 사태에 대비하기 위해 Consumer의 멱등한 처리는 유지했습니다.

CDCMySQL에 제공된 유저의 정보로 로그인하여 이진 로그를 확인해, 스냅샷을 만들고 초기 스냅샷 생성 이후로 MySQL의 바이너리 로그를 지속적으로 모니터링 합니다.

이를 통해 실시간성을 유지할 수 있으며, 이 방식 또한 MySQL에 지속적으로 연결되어 있어야 하지만 추가적인 쿼리 부하를 발생시키지 않아 효율적입니다.

지속적인 연결 유지로 인해 일부 리소스가 사용되지만 데이터베이스에 직접적으로 부하를 주는 주기적인 폴링 방식보다는 훨씩 적은 부하발생시킵니다.

따라서, 저는 CDC 방식이 Transactional Outbox Pattern을 구현하는 적합한 방식이라고 생각합니다.

지금까지 이 시리즈를 읽어주셔서 감사합니다.

  • V3 코드 레포지토리 -> 이동

🙇🏻

2개의 댓글

comment-user-thumbnail
2024년 8월 2일

시리즈 잘 읽었습니다. 그런데 간단히 동기적으로 카프카에 이벤트를 보낸 뒤, two-phase commit을 써서 간단하게 해결할 수도 있었을 거 같은데 db에 이벤트를 저장하시는 방법을 쓰신 이유가 있을까요?

1개의 답글