이전 글
[1편] MSA 이벤트 발행 - 이론
MSA에서 비지니스 객체의 변경에 대한 이벤트를 발행할 때, 비지니스 객체 변경과 이벤트 발행이 하나의 트랜잭션으로 실행되어야하는 이유를 설명했다. 또한 이를 위한 방법으로 Outbox 패턴을 소개했다.

Course Service Architecture [그림] 강의 서비스 아키텍처 - 강의 게시 요청 상세

위 아키텍처 그림은 강사(Teacher)가 강의 게시 요청을 했을 때 강의 서비스가 어떻게 작동하는지에 대한 아키텍처를 그림으로 그린 것이다. 우선 애플리케이션 단에서 보면 1) DB 어댑터를 통해 강의 posted 값을 1로 변경하는 비지니스 로직 (CourseEntity, CourseRepository)과 2) 이벤트 메시지 발행을 위해 OUTBOX 테이블에 이벤트를 쓰는 로직 (이벤트 발행기/리스너, OutboxEvent, OutboxEventRepository)이 있다. 그리고 이 두 개의 로직은 하나의 트랜잭션으로 실행된다. 그 다음 OUTBOX 테이블에 쓰여진 이벤트를 이벤트 채널에 발행하기 위해서 debezium을 사용한다. Debezium은 OUTBOX 테이블의 트랜잭션 로그의 변경분을 하나의 메시지로 kafka 메시지 브로커에 발행한다.

강의 서비스 애플리케이션

References:

강의 서비스 애플리케이션은 Spring으로 구현했다. 코드가 길어져 핵심 로직이 잘 안보일 수도 있어 롬복(Lombok 이란?)을 적용하여 getter, setter, 생성자 등을 어노테이션으로 대체했다.

  • API Controller
    - 강의의 게시 여부를 업데이트 하는 것은 리소스의 특정 부위를 업데이트하는 것으로 하위 리소스로 path를 정의했다.
    • /courses/{courseId}/post
    • /courses/{courseId}/unpost
@RestController
@RequestMapping("/courses")
public class CourseServiceApiController {
	@Autowired
	private CourseService courseService;
    
    ...
    
    @PostMapping("/{courseId}/post")
	public CourseResponseDTO postCourseById(@PathVariable("courseId") String courseId) {
		courseService.postCourse(courseId);
		return new CourseResponseDTO(courseId);
	}
	
	@PostMapping("/{courseId}/unpost")
	public CourseResponseDTO unpostCourseById(@PathVariable("courseId") String courseId) {
		courseService.unpostCourse(courseId);
		return new CourseResponseDTO(courseId);
	}

}
  • CourseService
    강의를 게시하는 코드를 보면 DB에 강의를 게시하도록 값을 바꾸고 이벤트 버스를 통해 이벤트를 발행한다. 이때, 이 모든 과정은 Transactional 하게 이뤄진다.
@AllArgsConstructor
@Service
public class CourseService {
	private CourseRepository courseRepository;
	private ApplicationEventPublisher eventBus;
	
    ...
    
	@Transactional
	public void postCourse(String courseId) {
		CourseEntity course = courseRepository.findById(courseId).orElseThrow(() -> new CourseNotExistError());
		course.post();
		courseRepository.save(course);
		eventBus.publishEvent(CoursePostedEvent.of(course));
	}
    
    ...
	
}
  • OutboxListener
    CourseService에서 이벤트 버스를 통해 발행된 이벤트는 OutboxListener에서 처리된다. 아래 코드를 보면 OUTBOX 테이블 (OutboxEventRepository)에 이벤트를 저장한다. 그리고 해당 이벤트는 바로 삭제해준다. 그 이유는 OUTBOX 테이블은 이벤트를 기록하기 위한 목적이 아니라 이벤트 발행을 위한 임시 메시지 큐이기 때문이다. Debezium은 DB 변경을 감지하여 메시지를 발행하기 때문에 save()로 임시 메시지 큐의 메시지를 소비 (consume)한 것이다. 그렇기 때문에 소비된 메시지는 바로 삭제해준다.
@Component
@AllArgsConstructor
public class OutboxListener {
	private OutboxEventRepository repository;
	
	@EventListener
    public void onExportedEvent(Outboxable event) {
        OutboxEvent outboxEvent = OutboxEvent.from(event);
        repository.save(outboxEvent);
        repository.delete(outboxEvent);
    }

}
  • CoursePostedEvent
    OUTBOX 테이블 이벤트를 쓰기 위해서는 Outboxable 인터페이스를 상속 받아 도메인 이벤트인 CoursePostedEvent를 구현한다.
public interface Outboxable {
	String getAggregateId();
	String getAggregateType();
	String getPayload();
	String getType();
}
public class CoursePostedEvent implements Outboxable {
private static ObjectMapper MAPPER = new ObjectMapper();
	
	private final String id;
    private final JsonNode payload;
    
    public CoursePostedEvent(String id, JsonNode payload) {
        this.id = id;
        this.payload = payload;
    }
    
    public static CourseCreatedEvent of(CourseEntity course) {
    	return new CourseCreatedEvent(course.getCourseId(), MAPPER.valueToTree(course));
	}

	@Override
	public String getAggregateId() {
		return String.valueOf(id);
	}

	@Override
	public String getAggregateType() {
		return CourseEntity.class.getSimpleName().replace("Entity", "");
	}

	@Override
	public String getPayload() {
		try {
            return MAPPER.writeValueAsString(payload);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
	}

	@Override
	public String getType() {
		 return this.getClass().getSimpleName();
	}

}
  • OutboxEvent
    OutboxEvent는 OUTBOX 테이블 (OutboxEventRepository)에 저장하는 entity이다. Outboxable 인터페이스를 상속받은 도메인 이벤트를 DB에 저장할 수 있는 Entity로 바꿔주는 from() 함수가 있다.
@Getter(AccessLevel.PUBLIC)
@Setter
@Entity
@AllArgsConstructor
@RequiredArgsConstructor
@Table(name ="outbox")
public class OutboxEvent {

    @Id
    @GeneratedValue
    @Type(type = "uuid-char")
    private UUID id;

    @Column(nullable = false)
    @NonNull
    private String aggregateType;

    @Column(nullable = false)
    @NonNull
    private String aggregateId;

    @Column(nullable = false)
    @NonNull
    private String type;

    @Column(nullable = false, length = 1048576) //e.g. 1 MB max
    @NonNull
    private String payload;


    @Column(nullable = false, updatable = false, insertable = false, columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
    private Timestamp timestamp;

    public static OutboxEvent from(Outboxable event) {
        return new OutboxEvent(
                event.getAggregateType(),
                event.getAggregateId(),
                event.getType(),
                event.getPayload()
        );
    }
}
  • OutboxEventRepository
    OUTBOX 테이블에 매핑되는 repository이다.
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, Long> {
	
}

Debezium

Docker로 셋팅하기

References:

version: '3'
services:
  mysql:
    image: mysql:8.0
    container_name: mysql
    ports:
      - 3307:3306
    environment:
      - MYSQL_ROOT_PASSWORD=admin
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
    volumes:
      - /Users/jimin/mysql/data:/var/lib/mysql

  zookeeper:
    container_name: zookeeper
    image: debezium/zookeeper:1.4
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"

  kafka:
    container_name: kafka
    image: debezium/kafka:1.4
    depends_on:
      - zookeeper
    links:
      - zookeeper:zookeeper
    ports:
      - "9092:9092"
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      
  connect:
    container_name: connect
    image: debezium/connect:1.4
    depends_on:
      - kafka
      - mysql
    links:
      - kafka:kafka
      - mysql:mysql
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  • mysql: 강의 서비스 database, Course 애그리거트를 저장하는 course 테이블과 outbox 이벤트를 저장하는 outbox 테이블로 구성되어 있다.
  • zookeeper: Kafka 브로커를 관리하고 조정하는데 사용된다.
  • kafka: debezium이 DB 변경분에 대한 이벤트를 발행하는 메시지 브로커이다.
  • connect: DB와 연결하여 DB의 변경분을 발굴(마이닝)하여 kafka 메시지 브로커에 메시지를 발행한다.

Docker compose를 통해 각 컨테이너를 띄우고, connect가 제대로 실행되고 있는지 확인한다.

$ docker-compose -f docker-compose.yml up -d 
$ curl http://localhost:8083/
{"version":"2.6.1","commit":"6b2021cd52659cef","kafka_cluster_id":"SdINnaM-QIyh4ZTJcUtvmA"}

Mysql 유저 권한

use mysql;

// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';

FLUSH PRIVILEGES;

커넥터 생성

등록 JSON의 각 프로퍼티에 대한 설명: https://debezium.io/documentation/reference/stable/connectors/mysql.html#_required_debezium_mysql_connector_configuration_properties

$ curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{"name":"outbox-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"mysqluser","database.password":"mysqlpw","database.server.id":"184054","database.server.name":"dbserver1","table.include.list":"course.outbox","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"course.outbox","database.allowPublicKeyRetrieval":"true"}}'
{"name":"outbox-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"mysqluser","database.password":"mysqlpw","database.server.id":"184054","database.server.name":"dbserver1","table.include.list":"course.outbox","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"course.outbox","database.allowPublicKeyRetrieval":"true","name":"outbox-connector"},"tasks":[],"type":"source"}

등록 JSON

{
   "name":"outbox-connector",
   "config":{
      "connector.class":"io.debezium.connector.mysql.MySqlConnector",
      "tasks.max":"1",
      "database.hostname":"mysql",
      "database.port":"3306",
      "database.user":"mysqluser",
      "database.password":"mysqlpw",
      "database.server.id":"184054",
      "database.server.name":"dbserver1",
      "table.include.list":"course.outbox",
      "database.history.kafka.bootstrap.servers":"kafka:9092",
      "database.history.kafka.topic":"course.outbox",
      "database.allowPublicKeyRetrieval":"true"
   }
}

커넥터 조회

$ curl -H "Accept:application/json" localhost:8083/connectors/
["outbox-connector"]

$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/outbox-connector
HTTP/1.1 200 OKector 
Date: Thu, 16 Feb 2023 13:18:37 GMT
Content-Type: application/json
Content-Length: 567
Server: Jetty(9.4.33.v20201020)

{"name":"outbox-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.allowPublicKeyRetrieval":"true","database.user":"mysqluser","database.server.id":"184054","tasks.max":"1","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"course.outbox","database.server.name":"dbserver1","database.port":"3306","database.hostname":"mysql","database.password":"mysqlpw","name":"outbox-connector","table.include.list":"course.outbox"},"tasks":[{"connector":"outbox-connector","task":0}],"type":"source"}

커넥터 삭제

나중에 필요할 때 삭제

$ curl --location --request DELETE 'http://localhost:8083/connectors/outbox-connector'

kafka 등록 토픽 리스트 확인

sh-4.2$ ./bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
__consumer_offsets
course.outbox
dbserver1
dbserver1.course.outbox
my_connect_configs
my_connect_offsets
my_connect_statuses

kafka에 발행된 메시지 확인

sh-4.2$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.course.outbox --from-beginning
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.course.outbox.Envelope"},"payload":{"before":null,"after":{"id":"33319dbb-358c-4f06-8e60-c82b81eab769","aggregate_type":"Course","aggregate_id":"bc32dd3f-2334-43d9-9493-71b538510efe","type":"CoursePostedEvent","payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"bc32dd3f-2334-43d9-9493-71b538510efe\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1676458941000,\"lastUpdatedAt\":1676598812000}","timestamp":"2023-02-17T01:53:42Z"},"source":{"version":"1.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1676598822000,"snapshot":"false","db":"course","table":"outbox","server_id":1,"gtid":null,"file":"binlog.000015","pos":13396,"row":0,"thread":518,"query":null},"op":"c","ts_ms":1676598823103,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.course.outbox.Envelope"},"payload":{"before":{"id":"33319dbb-358c-4f06-8e60-c82b81eab769","aggregate_type":"Course","aggregate_id":"bc32dd3f-2334-43d9-9493-71b538510efe","type":"CoursePostedEvent","payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"bc32dd3f-2334-43d9-9493-71b538510efe\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1676458941000,\"lastUpdatedAt\":1676598812000}","timestamp":"2023-02-17T01:53:42Z"},"after":null,"source":{"version":"1.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1676598823000,"snapshot":"false","db":"course","table":"outbox","server_id":1,"gtid":null,"file":"binlog.000015","pos":14352,"row":0,"thread":518,"query":null},"op":"d","ts_ms":1676598823156,"transaction":null}}
null

카프카의 메시지를 beautify한 것이다.

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"aggregate_type"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"aggregate_id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"type"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.data.Json",
                  "version":1,
                  "field":"payload"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.time.ZonedTimestamp",
                  "version":1,
                  "default":"1970-01-01T00:00:00Z",
                  "field":"timestamp"
               }
            ],
            "optional":true,
            "name":"dbserver1.course.outbox.Value",
            "field":"before"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"aggregate_type"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"aggregate_id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"type"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.data.Json",
                  "version":1,
                  "field":"payload"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.time.ZonedTimestamp",
                  "version":1,
                  "default":"1970-01-01T00:00:00Z",
                  "field":"timestamp"
               }
            ],
            "optional":true,
            "name":"dbserver1.course.outbox.Value",
            "field":"after"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "name":"io.debezium.data.Enum",
                  "version":1,
                  "parameters":{
                     "allowed":"true,last,false"
                  },
                  "default":"false",
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"table"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"server_id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"gtid"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"file"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"pos"
               },
               {
                  "type":"int32",
                  "optional":false,
                  "field":"row"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"thread"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"query"
               }
            ],
            "optional":false,
            "name":"io.debezium.connector.mysql.Source",
            "field":"source"
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"total_order"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"data_collection_order"
               }
            ],
            "optional":true,
            "field":"transaction"
         }
      ],
      "optional":false,
      "name":"dbserver1.course.outbox.Envelope"
   },
   "payload":{
      "before":null,
      "after":{
         "id":"33319dbb-358c-4f06-8e60-c82b81eab769",
         "aggregate_type":"Course",
         "aggregate_id":"bc32dd3f-2334-43d9-9493-71b538510efe",
         "type":"CoursePostedEvent",
         "payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"bc32dd3f-2334-43d9-9493-71b538510efe\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1676458941000,\"lastUpdatedAt\":1676598812000}",
         "timestamp":"2023-02-17T01:53:42Z"
      },
      "source":{
         "version":"1.4.2.Final",
         "connector":"mysql",
         "name":"dbserver1",
         "ts_ms":1676598822000,
         "snapshot":"false",
         "db":"course",
         "table":"outbox",
         "server_id":1,
         "gtid":null,
         "file":"binlog.000015",
         "pos":13396,
         "row":0,
         "thread":518,
         "query":null
      },
      "op":"c",
      "ts_ms":1676598823103,
      "transaction":null
   }
}

보다시피 하나의 메시지의 양만해도 방대하다. 우리가 관심있는 정보는 payload 부분인데 schema 내용만 해도 메시지가 엄청 길어진다. 또한 지금은 강의 서비스가 강의에 대한 이벤트만 발행하고 있는데 하나의 서비스가 여러 애그리커트에 대한 이벤트를 발행할 수도 있다. 예를 들어, 아래 그림처럼 주문 서비스 (Order service)가 주문 이벤트 (Order event)와 고객 이벤트 (Customer event)를 발행할 수 있다. 즉, 하나의 OUTBOX 테이블에서 서로 다른 종류의 이벤트를 구분하여 다른 topic으로 이벤트를 발행해야 하는 것이다. 그 밖에도 메시지 브로커를 통해 메시지를 소비하다 보면 다양한 이유로 중복된 메시지가 수신되기도 한다. 이는 다음 편에서 다루도록 한다.

[그림] 사용자 서비스 이벤트 아키텍처

다음 글
[3편] MSA 이벤트 발행 - Debezium Transformations
다음 편에서는 카프카 메시지에서 관심없는 부분을 없애 메시지 사이즈를 줄이고 하나의 OUTBOX 테이블에서 여러 토픽으로 메시지를 라우팅하는 방법에 대해 알아보고자 한다.

profile
개발하는 사람입니다.

0개의 댓글

Powered by GraphCDN, the GraphQL CDN