12/1

졸용·2025년 12월 1일

TIL

목록 보기
125/144

Kafka + FeignClient 적용 연습

delivery-server가 Consumer인 상황의 적용 연습이다.

🔹 Kafka

🔸 delivery-server: build.gradle

// Kafka
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

🔸delivery-server: application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092

🔸 delivery-server: KafkaConsumerConfig

package chill_logistics.delivery_server.infrastructure.config;

import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, OrderAfterCreateV1> orderConsumerFactory() {

        // JSON → OrderAfterCreateV1 역직렬화를 위한 Deserializer
        JsonDeserializer<OrderAfterCreateV1> deserializer =
            new JsonDeserializer<>(OrderAfterCreateV1.class, false);

        // Kafka 메시지 역직렬화 시 허용할 패키지를 명시적으로 지정
        deserializer.addTrustedPackages(
            "chill_logistics.delivery_server.infrastructure.kafka.dto"
        );

        // Kafka Consumer 설정 값
        Map<String, Object> properties = new HashMap<>();

        // Kafka Broker 주소 (Docker Compose에서 기본적으로 localhost:9092로 띄움)
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // Consumer Group ID (같은 Group으로 묶인 Consumer들은 같은 메시지를 중복 처리하지 않음)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "delivery-server-group");

        // 메시지 Key 역직렬화 방식
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // 메시지 Value 역직렬화 방식 (JsonDeserializer 사용)
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        // 위 설정값 + Key/Value Deserializer를 기반으로 실제 Consumer 인스턴스를 만들어 KafkaListener에 제공
        return new DefaultKafkaConsumerFactory<>(
            properties,
            new StringDeserializer(),   // Key Deserializer (String)
            deserializer                // Value Deserializer (OrderAfterCreateV1)
        );
    }

    // @KafkaListener가 사용할 Listener 컨테이너 생성 (ConsumerFactory를 주입하여 실제 Kafka Consumer 동작을 구성)
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1>
    orderKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        // ConsumerFactory 설정
        factory.setConsumerFactory(orderConsumerFactory());

        return factory;
    }
}

🔸 delivery-server: OrderAfterCreateV1

package chill_logistics.delivery_server.infrastructure.kafka.dto;

import java.time.LocalDateTime;
import java.util.UUID;

public record OrderAfterCreateV1(
    UUID orderId,
    UUID startHubId,
    UUID endHubId,
    UUID receiverFirmId,
    String receiverFirmFullAddress,
    String receiverFirmOwnerName,
    String requestNote,
    String productName,
    int productQuantity,
    LocalDateTime orderCreatedAt
) {}


🔹 FeignClient

🔸 delivery-server: build.gradle

// OpenFeign
implementation 'org.springframework.cloud:spring-cloud-starter-openfeign'

🔸 delivery-server: yml

spring:
  kafka:
    bootstrap-servers: localhost:9092

clients:
  hub:
    url: http://hub-server:19093

feign:
  client:
    config:
      default:
        connectTimeout: 2000
        readTimeout: 3000
        loggerLevel: basic

🔸 delivery-server: HubForDeliveryResponseV1

package chill_logistics.delivery_server.infrastructure.client.dto;

import java.util.UUID;

public record HubForDeliveryResponseV1(
    UUID hubId,
    String hubName,
    String hubFullAddress
) {}

🔸 delivery-server: FeignConfig

package chill_logistics.delivery_server.infrastructure.config;

import feign.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FeignConfig {

    @Bean
    public Logger.Level feignLoggerLevel() {

        return Logger.Level.BASIC;
    }
}

🔸 delivery-server: HubClient

package chill_logistics.delivery_server.infrastructure.client;

import chill_logistics.delivery_server.infrastructure.client.dto.HubForDeliveryResponseV1;
import java.util.UUID;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient(
    name = "hubClient",
    url = "${clients.hub.url}",
    configuration = chill_logistics.delivery_server.infrastructure.config.FeignConfig.class
)
public interface HubClient {

    @GetMapping("/v1/internal/hubs/{hubId}")
    HubForDeliveryResponseV1 getHub(@PathVariable("hubId") UUID hubId);
}

🔸 delivery-server: DeliveryService

package chill_logistics.delivery_server.application;

import chill_logistics.delivery_server.domain.entity.DeliveryStatus;
import chill_logistics.delivery_server.domain.entity.FirmDelivery;
import chill_logistics.delivery_server.domain.entity.HubDelivery;
import chill_logistics.delivery_server.domain.repository.FirmDeliveryRepository;
import chill_logistics.delivery_server.domain.repository.HubDeliveryRepository;
import chill_logistics.delivery_server.infrastructure.client.HubClient;
import chill_logistics.delivery_server.infrastructure.client.dto.HubForDeliveryResponseV1;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@RequiredArgsConstructor
public class DeliveryService {

    private final HubDeliveryRepository hubDeliveryRepository;
    private final FirmDeliveryRepository firmDeliveryRepository;
    private final HubClient hubClient;

    /* [허브 배송 생성 메서드]
     * Kafka 메시지로 order 정보 + FeignClient로 hub 정보 받아와서 허브 배송 생성
     */
    @Transactional
    public void createHubDelivery(OrderAfterCreateV1 message) {

        log.info("[허브 배송 생성 시작] - orderId={}", message.orderId());

        // 1. Hub 서비스에서 허브 정보 조회 (Feign)
        HubForDeliveryResponseV1 startHub = hubClient.getHub(message.startHubId());
        HubForDeliveryResponseV1 endHub = hubClient.getHub(message.endHubId());

        // 2. 초기 배송 상태 & 배송 순서 셋팅
        DeliveryStatus deliveryStatus = DeliveryStatus.WAITING_FOR_HUB;
        // TODO: 배송순서 로직 수정 필요
        int deliverySequenceNum = 1;

        // 3. HubDelivery 엔티티 생성
        HubDelivery hubDelivery = HubDelivery.createFrom(
            message,
            startHub.hubName(),
            startHub.hubFullAddress(),
            endHub.hubName(),
            endHub.hubFullAddress(),
            deliveryStatus,
            deliverySequenceNum
        );

        // 4. 허브 배송 저장
        HubDelivery savedHubDelivery = hubDeliveryRepository.save(hubDelivery);

        log.info("[허브 배송 생성 완료] - hubDeliveryId={}, orderId={}",
            savedHubDelivery.getId(), savedHubDelivery.getOrderId());
    }

    /* [업체 배송 생성 메서드]
     * Kafka 메시지로 order 정보 받아와서 업체 배송 생성
     */
    @Transactional
    public void createFirmDelivery(OrderAfterCreateV1 message) {

        log.info("[업체 배송 생성 시작] - orderId={}", message.orderId());

        // 1. 초기 배송 상태 & 배송 순서 셋팅
        DeliveryStatus deliveryStatus = DeliveryStatus.MOVING_TO_FIRM;
        int deliverySequenceNum = 2;

        // 2. FirmDelivery 엔티티 생성
        FirmDelivery firmDelivery = FirmDelivery.createFrom(
            message,
            deliveryStatus,
            deliverySequenceNum
        );

        // 3. 업체 배송 저장
        FirmDelivery savedFirmDelivery = firmDeliveryRepository.save(firmDelivery);

        log.info("[업체 배송 생성 완료] - firmDeliveryId={}, orderId={}",
            savedFirmDelivery.getId(), savedFirmDelivery.getOrderId());
    }

    /* [전체 배송 생성]
     * 허브 배송 + 업체 배송 = 전체 배송 생성
     */
    @Transactional
    public void createDelivery(OrderAfterCreateV1 message) {

        log.info("[배송 생성 시작] - orderId={}", message.orderId());

        createHubDelivery(message);
        createFirmDelivery(message);

        log.info("[배송 생성 완료] - orderId={}", message.orderId());
    }
}

🔸 delivery-server: DeliveryController

package chill_logistics.delivery_server.presentation;

import chill_logistics.delivery_server.application.DeliveryService;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import lib.entity.BaseStatus;
import lib.web.response.BaseResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/delivery")
public class DeliveryController {

    private final DeliveryService deliveryService;

    /**
     * [배송 생성]
     *
     * @param message 주문 정보가 담긴 Kafka 메시지
     * @return status CREATED 반환
     */
    @PostMapping
    public BaseResponse<Void> createDelivery(OrderAfterCreateV1 message) {

        deliveryService.createDelivery(message);

        return BaseResponse.ok(BaseStatus.CREATED);
    }
}
profile
꾸준한 공부만이 답이다

0개의 댓글