11/24

졸용·2025년 11월 24일

TIL

목록 보기
120/144

🔹 Kafka 실전 적용 알아보기

Kafka란?

고속 대용량 로그/이벤트를 토픽 단위로 쌓아두고, 여러 서비스가 비동기로 구독해서 처리하는 분산 메시징/스트리밍 플랫폼.

조금 자세히 보면:

🔸 핵심 개념

  1. Broker

    • Kafka 서버 인스턴스 하나.
    • 여러 대를 띄워서 클러스터를 구성.
  2. Topic

    • 메시지를 쌓아두는 “채널” 이름.
    • 예: order-created, payment-completed, user-signup 등.
  3. Partition

    • Topic을 물리적으로 쪼개놓은 단위.
    • 병렬 처리(Consumer 여러 개)와 확장성(스케일 아웃)을 위해 존재.
    • 각 메시지는 파티션 안에서 순서를 가짐(Offset).
  4. Producer

    • 메시지를 “보내는 쪽” 애플리케이션.
    • KafkaTemplate, REST API 서비스 등.
  5. Consumer

    • 메시지를 “받아서 처리하는 쪽”.
    • 메시지 핸들링 로직을 가짐.
  6. Consumer Group

    • 같은 그룹에 속한 Consumer들이 하나의 Topic 파티션들을 나눠서 병렬 처리.
    • 그룹 단위로 “한 메시지는 그룹 내에서 딱 한 Consumer만” 처리.
  7. Offset

    • 파티션 내에서 메시지의 위치(일련 번호).
    • Consumer는 마지막으로 읽은 Offset을 기억해, 재시작 시 이어서 처리.

🔸 Kafka 왜 사용하는가?

  • 느슨한 결합: 주문 서비스 → 결제 서비스 → 알림 서비스 간을 동기 HTTP 호출 대신, Kafka Topic으로 느슨하게 연결.
  • 비동기 처리: 주문 요청에 바로 응답하고, 뒤에서 배송/알림/포인트 적립 처리.
  • 버퍼 역할: 트래픽이 몰려도 Kafka가 일단 적재하고, Consumer가 천천히 따라잡을 수 있음.
  • 재처리 가능: 특정 Offset부터 다시 읽어서 “replay” 가능 (이벤트 소싱/로그 분석에 좋음).


🔹 Spring Boot에서 Kafka 연동 – 실전 예시

“주문 생성 이벤트”를 Kafka로 발행하고, 다른 서비스가 그걸 구독해서 처리하는 시나리오로 생각해보았다.

로컬에서 Kafka가 떠 있다고 가정했을 때,

📌 시나리오

  • order-server : 주문 생성 API → Kafka 토픽에 OrderAfterCreate 이벤트 발행 (Producer)
  • delivery-server : Kafka OrderAfterCreate 구독 → 배송 생성 (Consumer)

🔸 1. 이벤트 DTO 정의

실제로는 공통 라이브러리 모듈(예: order-events-common)을 만들어서 두 서비스에서 같이 쓰는 게 베스트.

두 서비스 모두 implementation project(":order-events-common") 같은 식으로 의존성 추가해서 사용한다고 생각하면 됨.

package chill_logistics.delivery_server.infrastructure.kafka.dto;

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
import java.time.LocalDateTime;
import java.util.UUID;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class OrderAfterCreateV1 {

    @NotNull
    private UUID startHubId;

    @NotBlank
    private String startHubFullAddress;

    @NotNull
    private UUID endHubId;

    @NotBlank
    private String endHubFullAddress;

    @NotNull
    @Positive
    private Integer deliverySequenceNum;

    @NotBlank
    private String deliveryStatus;  // ENUM String

    @NotNull
    @Positive
    private Double expectedDistance;

    @NotNull
    private LocalDateTime expectedDeliveryDuration;

    @Builder
    private OrderAfterCreateV1(
        UUID startHubId,
        String startHubName,
        String startHubFullAddress,
        UUID endHubId,
        String endHubName,
        String endHubFullAddress,
        Integer deliverySequenceNum,
        String deliveryStatus,
        Double expectedDistance,
        LocalDateTime expectedDeliveryDuration
    ) {
        this.startHubId = startHubId;
        this.startHubFullAddress = startHubFullAddress;
        this.endHubId = endHubId;
        this.endHubFullAddress = endHubFullAddress;
        this.deliverySequenceNum = deliverySequenceNum;
        this.deliveryStatus = deliveryStatus;
        this.expectedDistance = expectedDistance;
        this.expectedDeliveryDuration = expectedDeliveryDuration;
    }
}

나중에 Consumer가 늘어날 수 있는 상황을 대비해 공통 DTO는 좋지 않다.

DTO는 따로 구성하는 것이 좋고, 필요한 데이터만 뽑아서 쓸 수 있다.
이 때, 데이터를 받을 때, 인프라 계층에서 데이터를 모두 받고 애플리케이션 계층에서 필요한 데이터만 뽑아서 써도 된다.

Kafka는 infrastructure 계층에 KafkaListner(Adapter역할), Consumer를 두고, application 계층에서 어댑터 사용.



🔸 2. 주문 서비스 (order-service) – Producer 역할

⭐ 수정 필요

2-1. 의존성 (order-service/build.gradle)

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation project(':order-events-common')
}

2-2. 설정 (order-service/src/main/resources/application.yml)

server:
  port: 8081

spring:
  application:
    name: order-service

  kafka:
    bootstrap-servers: localhost:9092

    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

value-serializer를 JsonSerializer로 설정해서 DTO를 JSON으로 자동 직렬화.


2-3. Producer 설정 (KafkaTemplate) – 선택적이지만 명시적으로

package com.example.order.config;

import com.example.events.OrderCreatedEvent;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonSerializer;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, OrderCreatedEvent> orderProducerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, OrderCreatedEvent> orderKafkaTemplate() {

        return new KafkaTemplate<>(orderProducerFactory());
    }
}

2-4. Kafka Producer 서비스

package com.example.order.kafka;

import com.example.events.OrderCreatedEvent;
import java.time.LocalDateTime;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderEventProducer {

    private static final String TOPIC = "order-created";

    private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;

    public void publishOrderCreatedEvent(String orderId, Long userId, Long productId, Integer quantity) {

        OrderCreatedEvent event = OrderCreatedEvent.builder()
            .orderId(orderId)
            .userId(userId)
            .productId(productId)
            .quantity(quantity)
            .orderedAt(LocalDateTime.now())
            .build();

        log.info("[order-service] publishing event to Kafka: {}", event);

        kafkaTemplate.send(TOPIC, orderId, event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("[order-service] failed to send event", ex);
                } else {
                    log.info("[order-service] event sent. topic={}, partition={}, offset={}",
                        result.getRecordMetadata().topic(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset()
                    );
                }
            });
    }
}

2-5. 주문 API (Controller) – 주문 생성 후 이벤트 발행

package com.example.order.presentation;

import com.example.order.kafka.OrderEventProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;

@RestController
@RequiredArgsConstructor
@RequestMapping("/orders")
public class OrderController {

    private final OrderEventProducer orderEventProducer;

    @PostMapping
    public String createOrder(@RequestParam Long userId,
                              @RequestParam Long productId,
                              @RequestParam Integer quantity) {

        // 1. 실제로는 여기서 DB에 주문 저장
        String orderId = "ORD-" + System.currentTimeMillis();  // 예시용

        // 2. Kafka 이벤트 발행
        orderEventProducer.publishOrderCreatedEvent(orderId, userId, productId, quantity);

        // 3. 클라이언트 응답
        return "Order created: " + orderId;
    }
}

이 서비스는 8081포트에서 HTTP 요청 처리 + Kafka Producer 역할만 담당.



3. 알림 서비스 (delivery-server) – Consumer 역할

3-1. 의존성 (delivery-server/build.gradle)

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation project(':order-events-common')
}

웹 API가 굳이 필요 없으면 spring-boot-starter-web 없이, 배치/워커 앱처럼 돌아가도 됨.


3-2. 설정 (delivery-server/src/main/resources/application.yml)

server:
  port: 19096

spring:
  application:
    name: delivery-server

  kafka:
    bootstrap-servers: localhost:9092

    consumer:
      group-id: notification-service-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest

    listener:
      ack-mode: record

여기서 선택지가 두 가지 있다.

  1. 이 Consumer만 커스터마이징 → Java 설정 유지할 거라면, yaml에서 최소로만 두고, 진짜 동작은 KafkaConsumerConfig에 맡긴다.
    예를 들어 yaml는 단순히 broker 주소만:

    spring:
     kafka:
       bootstrap-servers: localhost:9092
    

    그리고 나머지 consumer 옵션, trusted packages, JsonDeserializer 타입 등은 전부 자바 config에서 관리.

  2. Java Config를 더 단순하게 하고, yml 쪽을 더 적극적으로 쓰고 싶다면:

    spring:
     kafka:
       bootstrap-servers: localhost:9092
    
       consumer:
         group-id: delivery-server-group
         key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
         value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
         properties:
           spring.json.trusted.packages: chill_logistics.delivery_server.infrastructure.kafka.dto
    

    그리고 자바 코드는 훨씬 간단하게:

    @Bean
    public ConsumerFactory<String, OrderAfterCreateV1> orderConsumerFactory() {
    
       JsonDeserializer<OrderAfterCreateV1> deserializer =
           new JsonDeserializer<>(OrderAfterCreateV1.class, false);
    
       deserializer.addTrustedPackages(
           "chill_logistics.delivery_server.infrastructure.kafka.dto"
       );
    
       return new DefaultKafkaConsumerFactory<>(
           new HashMap<>(),
           new StringDeserializer(),
           deserializer
       );
    }
    

필자는 1번의 방법으로 선택했다. 이유는,

다른 컨슈머가 추가될 경우에도

  • 이 DTO만 받는 컨슈머 → 이 Factory (orderKafkaListenerContainerFactory)
  • 다른 DTO → 별도 Factory

이렇게 확장성이 좋아진다고 판단했기 때문이다.


3-3. Consumer 설정 (Listener Container Factory)

package chill_logistics.delivery_server.infrastructure.kafka.config;

import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, OrderAfterCreateV1> orderConsumerFactory() {

        // JSON → OrderAfterCreateV1 역직렬화를 위한 Deserializer
        JsonDeserializer<OrderAfterCreateV1> deserializer =
            new JsonDeserializer<>(OrderAfterCreateV1.class, false);

        // Kafka 메시지 역직렬화 시 허용할 패키지를 명시적으로 지정
        deserializer.addTrustedPackages(
            "chill_logistics.delivery_server.infrastructure.kafka.dto"
        );

        // Kafka Consumer 설정 값
        Map<String, Object> properties = new HashMap<>();

        // Kafka Broker 주소 (Docker Compose에서 기본적으로 localhost:9092로 띄움)
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // Consumer Group ID (같은 Group으로 묶인 Consumer들은 같은 메시지를 중복 처리하지 않음)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "delivery-server-group");

        // 메시지 Key 역직렬화 방식
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // 메시지 Value 역직렬화 방식 (JsonDeserializer 사용)
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        // 위 설정값 + Key/Value Deserializer를 기반으로 실제 Consumer 인스턴스를 만들어 KafkaListener에 제공
        return new DefaultKafkaConsumerFactory<>(
            properties,
            new StringDeserializer(),   // Key Deserializer (String)
            deserializer                // Value Deserializer (OrderAfterCreateV1)
        );
    }

    // @KafkaListener가 사용할 Listener 컨테이너 생성 (ConsumerFactory를 주입하여 실제 Kafka Consumer 동작을 구성)
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1>
    orderKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        // ConsumerFactory 설정
        factory.setConsumerFactory(orderConsumerFactory());

        return factory;
    }
}

3-4. 배송 생성 서비스 (비즈니스 로직)

⭐ Kafka는 infrastructure 계층에 KafkaListner(Adapter역할), Consumer를 두고, application 계층에서 어댑터 사용하는 방식으로 수정 필요.

package chill_logistics.delivery_server.application;

import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import org.springframework.stereotype.Service;

@Service
public class DeliveryService {

    public void createDelivery(OrderAfterCreateV1 message) {

    }
}

3-5. Kafka Consumer (@KafkaListener)

⭐ Kafka는 infrastructure 계층에 KafkaListner(Adapter역할), Consumer를 두고, application 계층에서 어댑터 사용하는 방식으로 수정 필요.

package chill_logistics.delivery_server.infrastructure.kafka;

import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderAfterCreateListener {

    @KafkaListener(
        topics = "order-after-create",
        containerFactory = "orderKafkaListenerContainerFactory"
    )
    public void listen(OrderAfterCreateV1 message) {

        log.info("메시지 수신 OrderAfterCreateV1: {}", message);
        
        // 여기서 이제 service 호출
    }
}

이 서비스는 HTTP 요청 없이 백그라운드 워커처럼 돌아가면서 Kafka 메시지만 소비해도 되고, 필요하면 조회 API를 추가해서 “알림 이력 조회” 같은 기능도 만들 수 있다.



🔹로컬에서 전체 흐름 테스트 순서 요약

  1. Kafka/Zookeeper 띄우기

    • docker-compose든, 로컬 설치든 상관 없음
  2. delivery-server 실행 (19096)

    • Kafka Consumer가 먼저 떠 있어도, 나중에 떠도 OK (Kafka가 메시지를 보관함).
  3. order-server 실행 (19095)

  4. 주문 생성 요청 보내기

curl -X POST "http://localhost:8081/orders?userId=1&productId=100&quantity=2"
  1. 로그에서 확인:

    • order-server : Kafka로 event 발행 로그
    • delivery-server : Kafka에서 event 소비 + 배송 생성 로그


6. 정리

  • order-server: “주문 생성”에 집중 + 이벤트 발행 (Producer)
  • delivery-server: “배송 생성”에 집중 + 이벤트 소비 (Consumer)
  • 둘은 Kafka Topic을 통해서만 연결되어 있어서, 서로의 URL/구현에 직접 의존하지 않음
    ➡️ 느슨한 결합 + 확장성
profile
꾸준한 공부만이 답이다

0개의 댓글