[3편] MSA 이벤트 발행 - Debezium Transformations

이지민·2023년 2월 22일
0

MSA

목록 보기
3/3

이전글
[2편] MSA 이벤트 발행 - 구현
Outbox 패턴을 적용하여 비지니스 객체 변경과 이벤트 발행이 하나의 트랜잭션으로 동작하는 강의 서비스 애플리케이션을 구현했다.

지난 글에서 debezium을 이용하여 OUTBOX에 쓴 메시지를 kafka 메시지 브로커로 발행하는 방법에 대해 알아봤다. 하지만 강의를 게시하는 하나의 트랜잭션에 메시지가 다음가 같이 3개가 발생한다.

sh-4.2$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.course.outbox --from-beginning
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.course.outbox.Envelope"},"payload":{"before":null,"after":{"id":"33319dbb-358c-4f06-8e60-c82b81eab769","aggregate_type":"Course","aggregate_id":"bc32dd3f-2334-43d9-9493-71b538510efe","type":"CoursePostedEvent","payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"bc32dd3f-2334-43d9-9493-71b538510efe\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1676458941000,\"lastUpdatedAt\":1676598812000}","timestamp":"2023-02-17T01:53:42Z"},"source":{"version":"1.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1676598822000,"snapshot":"false","db":"course","table":"outbox","server_id":1,"gtid":null,"file":"binlog.000015","pos":13396,"row":0,"thread":518,"query":null},"op":"c","ts_ms":1676598823103,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"string","optional":false,"field":"aggregate_type"},{"type":"string","optional":false,"field":"aggregate_id"},{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"name":"io.debezium.time.ZonedTimestamp","version":1,"default":"1970-01-01T00:00:00Z","field":"timestamp"}],"optional":true,"name":"dbserver1.course.outbox.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.course.outbox.Envelope"},"payload":{"before":{"id":"33319dbb-358c-4f06-8e60-c82b81eab769","aggregate_type":"Course","aggregate_id":"bc32dd3f-2334-43d9-9493-71b538510efe","type":"CoursePostedEvent","payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"bc32dd3f-2334-43d9-9493-71b538510efe\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1676458941000,\"lastUpdatedAt\":1676598812000}","timestamp":"2023-02-17T01:53:42Z"},"after":null,"source":{"version":"1.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1676598823000,"snapshot":"false","db":"course","table":"outbox","server_id":1,"gtid":null,"file":"binlog.000015","pos":14352,"row":0,"thread":518,"query":null},"op":"d","ts_ms":1676598823156,"transaction":null}}
null

그 이유는 애플리케이션에서 이벤트 발생 시 outbox에 이벤트를 쓰고 지운다. 이에 따라 위와 같이 create 메시지와 delete 메시지가 발생한 것이다. 하지만 우리는 강의를 게시하는 하나의 트랜잭션에 하나의 메시지만 생성하면 된다.

@Component
@AllArgsConstructor
public class OutboxListener {
	private OutboxEventRepository repository;
	
	@EventListener
    public void onExportedEvent(Outboxable event) {
        OutboxEvent outboxEvent = OutboxEvent.from(event);
        repository.save(outboxEvent);
        repository.delete(outboxEvent);
    }
}

그 중 첫번째 메시지를 beautify 한 것이다. 보이는 것과 같이 리가 관심있는 정보 외에 많은 정보를 가지고 있어 메시지의 길다. 따라서 메시지 사이즈가 커지는 문제가 있다.

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"aggregate_type"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"aggregate_id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"type"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.data.Json",
                  "version":1,
                  "field":"payload"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.time.ZonedTimestamp",
                  "version":1,
                  "default":"1970-01-01T00:00:00Z",
                  "field":"timestamp"
               }
            ],
            "optional":true,
            "name":"dbserver1.course.outbox.Value",
            "field":"before"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"aggregate_type"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"aggregate_id"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"type"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.data.Json",
                  "version":1,
                  "field":"payload"
               },
               {
                  "type":"string",
                  "optional":false,
                  "name":"io.debezium.time.ZonedTimestamp",
                  "version":1,
                  "default":"1970-01-01T00:00:00Z",
                  "field":"timestamp"
               }
            ],
            "optional":true,
            "name":"dbserver1.course.outbox.Value",
            "field":"after"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "name":"io.debezium.data.Enum",
                  "version":1,
                  "parameters":{
                     "allowed":"true,last,false"
                  },
                  "default":"false",
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"table"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"server_id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"gtid"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"file"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"pos"
               },
               {
                  "type":"int32",
                  "optional":false,
                  "field":"row"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"thread"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"query"
               }
            ],
            "optional":false,
            "name":"io.debezium.connector.mysql.Source",
            "field":"source"
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"total_order"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"data_collection_order"
               }
            ],
            "optional":true,
            "field":"transaction"
         }
      ],
      "optional":false,
      "name":"dbserver1.course.outbox.Envelope"
   },
   "payload":{
      "before":null,
      "after":{
         "id":"33319dbb-358c-4f06-8e60-c82b81eab769",
         "aggregate_type":"Course",
         "aggregate_id":"bc32dd3f-2334-43d9-9493-71b538510efe",
         "type":"CoursePostedEvent",
         "payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"bc32dd3f-2334-43d9-9493-71b538510efe\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1676458941000,\"lastUpdatedAt\":1676598812000}",
         "timestamp":"2023-02-17T01:53:42Z"
      },
      "source":{
         "version":"1.4.2.Final",
         "connector":"mysql",
         "name":"dbserver1",
         "ts_ms":1676598822000,
         "snapshot":"false",
         "db":"course",
         "table":"outbox",
         "server_id":1,
         "gtid":null,
         "file":"binlog.000015",
         "pos":13396,
         "row":0,
         "thread":518,
         "query":null
      },
      "op":"c",
      "ts_ms":1676598823103,
      "transaction":null
   }
}

이와 같은 문제들을 해결하기 위해 debezium이 kafka에 발행하는 메시지를 변형하기 위한 방법으로 transformations 기능을 활용하고자 한다.

Debezium Transformations

커넥터는 transformations 기능을 활용하여 kafka에 메시지를 발행하기 전에 각 메시지를 가볍게 변형할 수 있다. 여러 변형을 위한 single message transformations (SMTs)가 있다. (자세한 내용은 공식 페이지를 참고)

이 글에서는 OUTBOX 패턴을 구현하기 위해 여러 SMT 중에 Outbox Event Router를 활용하고자한다.

Outbox Event Router

OUTBOX 테이블

OUTBOX 테이블의 column을 다음과 같이 정의한다.

CREATE TABLE `outbox` (
  `id` varchar(36) NOT NULL,
  `aggregatetype` varchar(255) NOT NULL,
  `aggregateid` varchar(255) NOT NULL,
  `type` varchar(255) NOT NULL,
  `payload` json NOT NULL,
  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

[코드] MySQL OUTBOX 테이블 DDL

ColumnTypeModifiers
iduuidnot null
aggregatetypecharacter varying(255)not null
aggregateidcharacter varying(255)not null
typecharacter varying(255)not null
payloadjsonb
  • id: 이벤트의 유니크한 ID, 메시지의 헤더에 포함, 중복 메시지 처리 시 활용, 메시지 발행 시 UUID 자동 생성
  • aggregatetype: 메시지를 발행하는 애그리거트 타입, kafka에 메시지 발행 시 outbox.event.${routedByValue} 토픽에 발행되는데 이때 기본으로 ${routedByValue}에 aggregatetype 값으로 설정, 예시) Course, User
  • aggregateid: payload의 id 값으로 kafka 파티션에서 순서를 유지하는데 필요함, 예시) 강의 ID, 사용자 ID
  • type: 이벤트 타입, 예시) 강의가 게시됨 (CoursePostedEvent),
  • payload: OUTBOX 테이블의 변경 이벤트, 이벤트의 내용이 담김

커넥터 생성

아래 json을 course-outbox-connector.json으로 저장한다. 아래 json 내용을 보면 Outbox Event Router SMT를 적용하기 위해 transforms.* 관련 설정이 추가된 것을 확인 할 수 있다. (기존 json과 비교)

{
   "name":"course-outbox-connector",
   "config":{
      "connector.class":"io.debezium.connector.mysql.MySqlConnector",
      "tasks.max":"1",
      "database.hostname":"mysql",
      "database.port":"3306",
      "database.user":"mysqluser",
      "database.password":"mysqlpw",
      "database.server.id":"184054",
      "database.server.name":"dbserver1",
      "database.include.list": "course",
      "table.include.list":"course.outbox",
      "database.history.kafka.bootstrap.servers":"kafka:9092",
      "database.history.kafka.topic":"course.outbox",
      "database.allowPublicKeyRetrieval":"true",
      "transforms":"outbox",
      "transforms.outbox.type":"io.debezium.transforms.outbox.EventRouter",
      "transforms.outbox.table.fields.additional.placement": "type:envelope:eventType"
   }
}

[코드] Course 서비스이 OUTBOX 커넥터 생성 JSON

$ curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' -d @./course-outbox-connector.json

[코드] Course 서비스이 OUTBOX 커넥터 생성

그리고 강의 게시 요청을 하여 강의가 게시됨 (CoursePostedEvent) 이벤트를 발생 시킨다.

메시지 확인

$ curl --location --request POST 'http://127.0.0.1:8080/courses/abe1a13c-a1f5-41a5-8c22-85fe434f358a/post'

그리고 outbox.event.Course 토픽으로 kafka 메시지를 확인하면 아래와 같은 메시지를 받아 볼 수 있다. 기존의 메시지에 비해 훨씬 간결해지고 필요한 정보만 담겨 있는 것을 확인 할 수 있다.

sh-5.1$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic outbox.event.Course --from-beginning --property print.headers=true
id:4404c026-23c6-42a9-a1ff-770419b04ecd	{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"field":"eventType"}],"optional":false,"name":"dbserver1.course.outbox.Course.Value"},"payload":{"payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"abe1a13c-a1f5-41a5-8c22-85fe434f358a\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1677021515000,\"lastUpdatedAt\":1677033421000}","eventType":"CoursePostedEvent"}}

[코드] Outbox 메시지 예시

{
    "schema":{
        "type":"struct",
        "fields":[
            {
                "type":"string",
                "optional":false,
                "name":"io.debezium.data.Json",
                "version":1,
                "field":"payload"
            },
            {
                "type":"string",
                "optional":false,
                "field":"eventType"
            }
        ],
        "optional":false,
        "name":"dbserver1.course.outbox.Course.Value"
    },
    "payload":{
        "payload":"{\"name\":\"MSA 파헤치기\",\"posted\":true,\"courseId\":\"abe1a13c-a1f5-41a5-8c22-85fe434f358a\",\"teacherId\":\"7ead2ed1-77fa-4d70-bdfd-77e67c513bbd\",\"thumbnail\":\"msa_course.jpeg\",\"description\":\"MSA의 모든 것을 가르쳐 드립니다.\",\"registeredAt\":1677021515000,\"lastUpdatedAt\":1677031740000}",
        "eventType":"CoursePostedEvent"
    }
}

[코드] 메시지 Beautify 결과

사용자 서비스 예제

사용자 서비스는 강의 서비스와 다르게 두 개의 애그리거트 (Teacher, Student)를 가지고 있다. 따라서 두 개의 애그리거트가 발생시키는 이벤트는 각각 다른 토픽으로 발행된다. 이를 Outbox Event Router SMT를 이용하여 구현하는 예제이다.

사용자는 Teacher와 Student 타입으로 구분된다. 강의 중계 플랫폼은 Teacher 사용자가 등록되면 강사로 활동하기 위해 참고해야 할 문서를 메일로 보내준다. 그리고 Student 사용자가 등록되면 추천 강의를 메일로 보내준다. 즉, Teacher와 Student는 별도의 애그리거트로 각자 다른 이벤트와 정책을 가진다. 따라서 Teacher와 Student는 각각 다른 토픽으로 이벤트를 발행하는 것으로 설계한다.

[그림] 사용자 서비스 이벤트 아키텍처

사용자 서비스 DB

사용자 테이블

CREATE TABLE `user` (
  `user_id` varchar(36) NOT NULL,
  `name` varchar(45) NOT NULL,
  `email` varchar(45) NOT NULL,
  `type` varchar(16) NOT NULL,
  `registered_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `last_updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

OUTBOX 테이블

CREATE TABLE `outbox` (
  `id` varchar(36) NOT NULL,
  `aggregatetype` varchar(255) NOT NULL,
  `aggregateid` varchar(255) NOT NULL,
  `type` varchar(255) NOT NULL,
  `payload` json NOT NULL,
  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

애플리케이션

위에서 언급한 것 처럼, kafka에 메시지 발행 시 outbox.event.${routedByValue} 토픽에 발행된다. 이때 기본으로 ${routedByValue}에 aggregatetype 값으로 설정되기 때문에 선생님이냐 학생이냐에 따라 aggregatetype을 별도로 주면 각각 다른 토픽으로 메시지를 발행할 수 있다.

  • UserType
    사용자는 Teacher와 Student 타입으로 구분된다. 이를 enum으로 구현했다.
public enum UserType {
	TEACHER("Teacher"),
	STUDENT("Student");
	
	private String value;
	
    UserType(String value) {
    	this.value = value;
	}

	public String getKey() {
        return name();
    }
	
	public String getValue() {
		return this.value;
	}
}
  • User
    사용자에 대한 entity는 다음과 같이 정의한다.
@Getter(AccessLevel.PUBLIC)
@Builder
@AllArgsConstructor
@Table (name ="user") 
@Entity()
public class User {
	@Id
	@GeneratedValue(generator = "uuid2")
	@GenericGenerator(name = "uuid2", strategy = "uuid2")
	@Column(columnDefinition = "BINARY(16)")
	private String userId;
    
    ...
	
	@Enumerated(EnumType.STRING)
	@Column(nullable = false, length = 16)
	private UserType type;
    
    ...
    
  • UserCreatedEvent
    마지막으로 사용자 타입의 enum value 값을 aggreateType으로 설정한다.
public class UserCreatedEvent implements Outboxable  {
	private static ObjectMapper MAPPER = new ObjectMapper();
	
	private final String id;
	private final String aggreateType;
    private final JsonNode payload;
    
    public UserCreatedEvent(String id, String aggreateType, JsonNode payload) {
        this.id = id;
        this.aggreateType = aggreateType;
        this.payload = payload;
    }
    
    public static UserCreatedEvent of(User user) {
    	return new UserCreatedEvent(user.getUserId(), user.getType().getValue(), MAPPER.valueToTree(user));
	}

	...

	@Override
	public String getAggregateType() {
		return this.aggreateType;
	}
    
    ...

Debezium 커넥터 생성

사용자 서비스 DB에 대한 커넥터 생성을 위한 json이다. 이를 outbox-user.json으로 저장한다.

{
    "name":"user-outbox-connector",
    "config":{
        "connector.class":"io.debezium.connector.mysql.MySqlConnector",
        "tasks.max":"1",
        "database.hostname":"mysql",
        "database.port":"3306",
        "database.user":"mysqluser",
        "database.password":"mysqlpw",
        "database.server.id":"184054",
        "database.server.name":"dbserver1",
        "database.include.list": "user",
        "table.include.list":"user.outbox",
        "database.history.kafka.bootstrap.servers":"kafka:9092",
        "database.history.kafka.topic":"user.outbox",
        "database.allowPublicKeyRetrieval":"true",
        "transforms":"outbox",
        "transforms.outbox.type":"io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.table.fields.additional.placement": "type:envelope:eventType"
    }
}

그리고 커넥터를 생성해준다.

curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' -d @./outbox-user.json
{"name":"user-outbox-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"mysqluser","database.password":"mysqlpw","database.server.id":"184054","database.server.name":"dbserver1","database.include.list":"user","table.include.list":"user.outbox","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"user.outbox","database.allowPublicKeyRetrieval":"true","transforms":"outbox","transforms.outbox.type":"io.debezium.transforms.outbox.EventRouter","transforms.outbox.table.fields.additional.placement":"type:envelope:eventType","name":"user-outbox-connector"},"tasks":[],"type":"source"}

결과 확인

그리고 선생님과 학생 사용자를 API 호출을 하여 만든다.

# 선생님 사용자 추가
$ curl --location --request POST 'http://127.0.0.1:8080/users' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "홍길동",
    "email": "gildong@sample.com",
    "type": "TEACHER"
}'
# 학생 사용자 추가
$ curl --location --request POST 'http://127.0.0.1:8080/users' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "김학생",
    "email": "student@sample.com",
    "type": "STUDENT"
}'

애그리거트 Teacher에 대한 이벤트가 아래와 같이 outbox.event.Teacher 토픽으로 발행된다.

sh-5.1$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic outbox.event.Teacher --from-beginning --property print.headers=true
id:203288a3-4391-411e-8a92-58a6f17262f1	{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"field":"eventType"}],"optional":false,"name":"dbserver1.user.outbox.Teacher.Value"},"payload":{"payload":"{\"name\":\"홍길동\",\"type\":\"TEACHER\",\"email\":\"gildong@sample.com\",\"userId\":\"49c28987-8f91-44fc-89ca-8eb57c00444d\",\"registeredAt\":null,\"lastUpdatedAt\":null}","eventType":"UserCreatedEvent"}}

애그리거트 Student에 대한 이벤트는 아래와 같이 outbox.event.Student 토픽으로 발행된다.

sh-5.1$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic outbox.event.Student --from-beginning  --property print.headers=true
id:8c54bff6-0c46-42de-9fb8-b5ee4dc3e794	{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"name":"io.debezium.data.Json","version":1,"field":"payload"},{"type":"string","optional":false,"field":"eventType"}],"optional":false,"name":"dbserver1.user.outbox.Teacher.Value"},"payload":{"payload":"{\"name\":\"김학생\",\"type\":\"STUDENT\",\"email\":\"student@sample.com\",\"userId\":\"e44cb206-bc9a-4c65-a0c4-e4da93314e98\",\"registeredAt\":null,\"lastUpdatedAt\":null}","eventType":"UserCreatedEvent"}}

마무리

이렇게 총 3편에 걸쳐서 MSA에서 이벤트를 발행하는 이유와 방법을 살펴보았다. 다음편에서는 이벤트 소싱에 대해 알아보려고 한다.

profile
개발하는 사람입니다.

0개의 댓글