"Kafka와 통신을 한다"는 과정은 Kafka에 보낼 메시지를 만들어 보내고, 향후에 Kafka에서 전송한 메시지를 읽고 처리하는 것과 동일하다.
이때 메시지는 일전에 정리했던 내용처럼 Json과 같은 문자열인데, 이때 단순히 Json문자열을 전송하기위한 직렬화 작업을 진행해주는 것에 그치면 곤란하다.
유지보수 관점에서 특정 서비스에서 어떠한 형태 혹은 내용으로 직렬화를 하고, 이 직렬화를 진행할 Class나 정책정보(Enum), Payload, EventType 등을 어떻게 구성할지에 대한 내용도 정확히 이해하고 파악해야 한다.
Kafka와 통신하기 위해 기본적인 환경구성을 어떻게 진행해야 하는지, Event 객체와 Data Serializer를 중심으로 찬찬히 살펴보았다.
예를 들어, 게시글을 생성하였을때 인기글 집계를 위해 게시글 생성 이벤트를 만들겠다는 설계를 하였을때 해당 게시글 생성 이벤트를 생성하기 위한 데이터 객체, 즉 Payload 도메인 객체를 먼저 만들어야 한다.
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
/*
* 각 Event 별로 가지는 payload 정의
* */
public class ArticleCreatedEventPayload implements EventPayload {
private Long articleId;
private String title;
private String content;
private Long boardId;
private Long writerId;
private LocalDateTime createdAt;
private LocalDateTime modifiedAt;
private Long boardArticleCount;
}
이후의 과정을 이해를 돕기위해 pseudo code 형식으로 구성해보면 다음과 같다.
Event<EventPayload> event = Event.of(
UUID.randomUUID().getMostSignificantBits(), // eventId (혹은 DB 시퀀스)
EventType.ARTICLE_CREATED,
payload
);
// Kafka로 보내기 위해 JSON 직렬화
String json = event.toJson();
// Kafka Producer 전송
kafkaTemplate.send(event.getType().getTopic(), json);
이때 직렬화한 JSON 문자열은 바이트 배열로 Kafka 내부적으로 직렬화하여 데이터 로그 파일에 파티션 별로 붙는다.
Pub/Sub 모델에 따르면 Consumer는 본인이 구독하고 있는 topic에 대해 메시지를 읽는다.
@KafkaListener(topics = EventType.Topic.KUKE_BOARD_ARTICLE, groupId = "board-service")
public void consume(String message) {
Event<EventPayload> event = Event.fromJson(message);
switch (event.getType()) {
case ARTICLE_CREATED -> handleArticleCreated((ArticleCreatedEventPayload) event.getPayload());
case ARTICLE_UPDATED -> ...
}
}
이처럼, 위와 동일한 Event 도메인 객체의 책임 하에 Kafka로 전송하기 위한 Event 생성(of) 책임과 역직렬화(fromJson)이 동시에 위치한다.
일단 이걸 기억하고, Kafka로부터 읽어온 데이터 로그 내용(message)을 역직렬화(fromJson)하여 EventPayload에 맞는 Event Object를 만든다.
/*
* Kafka 통신을 위한 역직렬화(역직렬화 후 Json 정보에서 EventRaw 정보를 받아오고 이를 Event 객체화)
* Json -> Event
* */
public static Event<EventPayload> fromJson(String json) {
EventRaw eventRaw = DataSerializer.deserialize(json, EventRaw.class);
if (eventRaw == null) {
return null;
}
Event<EventPayload> event = new Event<>();
event.eventId = eventRaw.getEventId();
event.type = EventType.from(eventRaw.getType());
event.payload = DataSerializer.deserialize(eventRaw.getPayload(), event.type.getPayloadClass());
return event;
}
이때 fromJson 내부적으로 eventType Enum에서 구성한 매핑정보에 따라 type(topic) 및 EventTypePayload를 반환한다.
이에 대한 내용은 아래에서 자세히 살펴본다.
넓게 본다면, Event.fromJson(...) 내부 로직은
JSON 문자열에서 EventRaw 객체로 1차 변환한다.
이때 payload는 아직 단순 Object(deserializer로 ObjectMapper를 통해 역직렬화된 상태이다.
이후 EventRaw.type을 기반으로 EventType Enum에서 from(type)을 실행하여 해당 이벤트가 어떤 이벤트인지 확인한다.
EventType.payloadClass에 따라 payload를 최종적으로 역직렬화 (예: ArticleCreatedEventPayload.class)한다.
Event 객체는 event Id, type, payload를 구분하기 위해(특히 type과 payload를 구별하기 위함) 멤버변수로 해당 항목들을 구성하며, 이는 eventRaw 객체의 항목과 동일하다.
이 과정을 통해 Consumer는 JSON을 EventRaw 객체화, 이후 다시 Event<EventPayload>
객체로 역직렬화하여,
으로 이어지기까지, 각 책임을 두 부분으로 나누어 진행하여 안정적인 역직렬화를 의도하도록 한다.
이때 EventPayload 클래스를 구성할때 데이터 구성요소만 잘 정의해주면 되지 않을까 생각이 들겠지만, DTO변환을 명시적으로 정해주고 여러 이벤트를 생산해야 하는 Producer 도메인을 명확하게 유지관리한다는 관점에서 별도의 EventPayload Interface 구성을 권장한다.
실제로 아래와 같이 수많은 이벤트 payload 객체를 생성해주었는데,
이에 대한 타입을 지정해주기 위한 목적으로
public interface EventPayload {
}
와 같이 인터페이스를 별도 생성하여 사용한다.
@Getter
public class Event<T extends EventPayload> {
private Long eventId;
private EventType type;
private T payload;
이후 해당 인터페이스를 사용할때 Event 객체에서 T Generic을 활용하여 멤버변수의 인스턴스 타입을 명시해줄 수 있고, 명확한 기준에 따라 payload 클래스를 확장할 수 있는 기준과 확장성을 마련해준다.
Interface를 별도 생성해주는 것이 유지관리에 유리한 이유를 정리하면,
Event 클래스가 Event<T extends EventPayload>
형태로 정의되어 있으므로, Event 멤버변수(여기서는 payload)는 반드시 EventPayload의 구현체여야만 한다(인스턴스화하여 사용할 것이 아니기때문에 메타데이터로 활용할 수도 있음).
따라서, 해당 Event 객체에 엉뚱한 DTO나 엔티티가 들어가는 걸 방지할 수 있다.
실무에서 이벤트 로깅, 공통 유효성 검증, 모니터링 같은 작업을 payload 레벨에서 할 때가 많은데, 상위타입을 활용할 수 있다는 관점에서 확장성이나 공통처리에 유리하다.
즉, EventPayload를 상위 타입으로 두면, 모든 payload를 하나의 컬렉션에 담아 처리하거나, 공통 인터페이스 메서드를 강제할 수 있다.
단순히 제네릭 제약(T extends EventPayload)을 위한 것뿐 아니라, 실무적으로 이벤트 payload들의 통일성과 관리성을 확보하기 위한 장치라고도 볼 수 있겠다.
JSON을 상위 Object 객체로 직렬화 및 역직렬화하는 과정은 ObjectMapper에서 제공하는 기능을 활용한다.
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class DataSerializer {
/*
* Kafka와 통신(이벤트를 주고 받을때) 시 필요한 util Module.
* (*Kafka로 전달할때 Json 직렬화 및 데이터 받아올때 역직렬화 필요).
* */
private static final ObjectMapper objectMapper = initialize();
/*
* Object Mapper
* - 시간 관련 직렬화 : Java Time Module
* - 역직렬화 시 정하지 않은 세팅 있을 경우 오류가 나지 않도록 설정
* */
private static ObjectMapper initialize() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
/*
* 데이터 역직렬화
* String type -> class
* */
public static <T> T deserialize(String data, Class<T> clazz) {
try {
return objectMapper.readValue(data, clazz);
} catch (JsonProcessingException e) {
/*
* error 발생 시 null 반환
* */
log.error("[DataSerializer.deserialize] data={}, clazz={}", data, clazz, e);
return null;
}
}
/*
* 데이터 역직렬화
* Object -> Type
* */
public static <T> T deserialize(Object data, Class<T> clazz) {
return objectMapper.convertValue(data, clazz);
}
/*
* 직렬화
* Object -> Json
* */
public static String serialize(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
/*
* error 발생 시 null 반환
* */
log.error("[DataSerializer.serialize] object={}", object, e);
return null;
}
}
}
이때 직렬화 및 역직렬화는 Event 도메인에서 다방면 활용하므로, common util 클래스로 만들어서 사용하면 좋겠다.
이처럼 Kafka 환경을 구성할때 유지관리성을 확보하기 위한 몇가지 유의해야 할 점을 살펴보았다.
정리하면 다음과 같다.
유지관리에 유리할 Event 도메인 및 Payload 등의 지정방안에 대해 살펴보고 이해하였으니, 실제 구현을 통해 대규모 데이터를 Kafka와 통신하여 처리하기 위한 실제 애플리케이션 구현을 진행해보자.