Spring Kafka 기초 예제

itbuddy·2024년 9월 20일

Kafka

목록 보기
3/4

Spring 과 Kafka를 연동하는 방법은 크게 두가지가 존재합니다.

  • spring-kafka만 활용
  • spring-cloud-stream + spring-cloud-stream-binder-kafka

이번 시간에는 spring-kafka 를 활용하여 kafka의 pub/sub를 구현해보며 pub,sub 시 발생할 수 있는 휴먼에러를 방지하는 코드를 작성해 봅시다. (이 예제는 기본적인 구현이며, 실제 프로젝트에서는 에러 처리, 재시도, 트랜잭션 등 추가적인 고려 사항이 필요합니다.)

github


Gradle 설정

    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.kafka:spring-kafka' // spring-kafka
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

Kafaka Procuder, Consumer 설정 (Pub/Sub)

KafkaConfig Producer (Pub) 설정

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    public String bootstrapAddress;

// Producer (Pub) 설정
    @Bean
    public ProducerFactory<String, Object> multiTypeProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(ProducerConfig.ACKS_CONFIG,
            "all"); // 가장 안전하지만 가장 느린 설정입니다. 모든 ISR (In-Sync Replicas, 동기화된 복제본)에 레코드가 복제될 때까지 기다립니다. ISR 중 하나라도 살아있다면 레코드는 유실되지 않습니다.
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);//재전송 횟수
        configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);//재전송 간격
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
        return new KafkaTemplate<>(multiTypeProducerFactory());
    }
...

}

KafkaConfig Consumer (Sub)설정

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    public String bootstrapAddress;

...
// Consumer (sub) 설정

// Consumer에서 자동 형변환을 해주기 위한 설정
 	@Bean
    public RecordMessageConverter multiTypeConverter() {
        StringJsonMessageConverter converter = new StringJsonMessageConverter();
        DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
        typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
        typeMapper.addTrustedPackages("*");
        Map<String, Class<?>> mappings = new HashMap<>();
        mappings.put("kafkaOrderCreatedPayload", KafkaOrderCreatedPayload.class);
        mappings.put("kafkaOrderCancelPayload", KafkaOrderCancelPayload.class);
        typeMapper.setIdClassMapping(mappings);
        converter.setTypeMapper(typeMapper);
        return converter;
    }

    @Bean
    public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(multiTypeConsumerFactory());
        factory.setRecordMessageConverter(multiTypeConverter());
        return factory;
    }
 }

Kafka Producer,Consumer 동작에서 휴먼 에러를 막기위한 구조 작성

컨셉

핵심 요소

  • KafkaTopicName : Kafka topic을 static 필드로 가지고 있는 객체
  • KafkaBasePayload : KafkaPayload 객체 작성시 반드시 implement 해야할 interface

원리

  1. KafkaBasePayload 인터페이스를 통해 메시지 발행 시 topic과 key 정보를 명확히 명시하여 휴먼 에러를 줄입니다.
  2. KafkaTopicName으로 topic 이름을 관리하여 중복이나 오타를 방지합니다.
  3. 각 Payload에서 KafkaBasePayload의 메서드를 구현하여 사용합니다.

결과

코드에 key, topic을 명시하여 휴먼 에러를 방지합니다.

KafkaTopicName

public class KafkaTopicName {
    public static final String ORDER_CREATED = "order-created.v1";
    public static final String ORDER_CANCELED = "order-canceled.v1";
}

KafkaBasePayload

public interface KafkaBasePayload {
    // get 이 prefix로 지정 되어 있지 않는 경우 key, topic도 Value에 포함되지 않음
    String key();
    String topic();

    // get이 prefix로 지정될 경우 key, topic도 Value에 포함되어 Kafka에 전송됨
//    String getKey();
//    String getTopic();
}

KafkaOrderCancelPayload

public record KafkaOrderCancelPayload(
    int orderId
) implements KafkaBasePayload {

    @Override
    public String key() {
        return String.valueOf(this.orderId);
    }

    @Override
    public String topic() {
        return KafkaTopicName.ORDER_CANCELED;
    }
}

KafkaOrderCreatedPayload

public record KafkaOrderCreatedPayload(
    int orderId,
    int itemId
) implements KafkaBasePayload {

    @Override
    public String key() {
        return String.valueOf(this.orderId);
    }

    @Override
    public String topic() {
        return KafkaTopicName.ORDER_CREATED;
    }
}

Producer


@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaMultiProducer {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public <T extends KafkaBasePayload> void publish(T payload) {
        kafkaTemplate.send(payload.topic(), payload.key(), payload);
        log.info("kafka message published. topic: {}, key: {}, body: {}", payload.topic(),
            payload.key(), payload);
    }


}

Consumer

@Component
@Slf4j
@KafkaListener(
    id = "orderCanceled",
    topics = {KafkaTopicName.ORDER_CANCELED}
)
public class KafkaOrderCanceledConsumer {

    @KafkaHandler
    public void consumer(KafkaOrderCancelPayload kafkaOrderCancelPayload) {
        log.info("multiGroup Received: " + kafkaOrderCancelPayload);
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
        log.error("multiGroup Received unknown: " + object);
    }


}


@Component
@Slf4j
@KafkaListener(
    id = "orderCreated",
    topics = {KafkaTopicName.ORDER_CREATED}
)
public class KafkaOrderCreatedConsumer {


    @KafkaHandler
    public void consumer(KafkaOrderCreatedPayload kafkaOrderCreatedPayload) {
        log.info("multiGroup Received: " + kafkaOrderCreatedPayload);
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
        log.error("multiGroup Received unknown: " + object);
    }


}


Controller (Service Layer 생략)

@RestController
@RequestMapping("/multi")
@RequiredArgsConstructor
public class SimpleMultiController {

    private final KafkaMultiProducer kafkaMultiProducer;

    @GetMapping("/create/{orderId}/{itemId}")
    public ResponseEntity<String> create(@PathVariable("orderId") int orderId, @PathVariable("itemId") int itemId) {
        KafkaOrderCreatedPayload payload = new KafkaOrderCreatedPayload(orderId, itemId);
        kafkaMultiProducer.publish(payload);
        return ResponseEntity.ok("ok");
    }

    @GetMapping("/cancel/{orderId}")
    public ResponseEntity<String> cancel(@PathVariable("orderId") int orderId) {

        KafkaOrderCancelPayload payload = new KafkaOrderCancelPayload(orderId);
        kafkaMultiProducer.publish(payload);
        return ResponseEntity.ok("ok");
    }

}

테스트 코드 ( 실패케이스는 다음에 작성 )

@SpringBootTest
@EmbeddedKafka(partitions = 5,
    topics = {
        KafkaTopicName.ORDER_CREATED})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@DisplayName(" Kafka test : ** 대기 시간이 존재하여 오래 걸림")
public class KafkaOrderCreatedConsumerTest {

    @Autowired
    KafkaTemplate<String, Object> template;

    @Autowired
    KafkaMultiProducer producer;

    @SpyBean
    KafkaOrderCreatedConsumer consumer;

    @Captor
    ArgumentCaptor<KafkaOrderCreatedPayload> kafkaOrderCreatedPayloadArgumentCaptor;

    @Captor
    ArgumentCaptor<Object> unknownCaptor;

    // Topic, partition, offset을 사용하며 captor 할때 사용
//    @Captor
//    ArgumentCaptor<String> topicArgumentCaptor;
//
//    @Captor
//    ArgumentCaptor<Integer> partitionArgumentCaptor;
//
//    @Captor
//    ArgumentCaptor<Long> offsetArgumentCaptor;


    @Nested
    @DisplayName("KafkaTopic.ORDER_CREATED 테스트")
    class KafkaTopic_Name_ORDER_CREATED {

        @Test
        @DisplayName("정상 처리")
        public void success()
            throws Exception {
            int orderId = 1;
            int itemId = 2;

            KafkaOrderCreatedPayload sendPayload = new KafkaOrderCreatedPayload(orderId, itemId);

            producer.publish(sendPayload);

            verify(consumer, timeout(3000).times(1))
                .consumer(kafkaOrderCreatedPayloadArgumentCaptor.capture());

            KafkaOrderCreatedPayload kafkaOrderCreatedPayload = kafkaOrderCreatedPayloadArgumentCaptor.getValue();

            assertEquals(kafkaOrderCreatedPayload.orderId(), 1);
            assertEquals(kafkaOrderCreatedPayload.itemId(), itemId);

        }

    }


}

Ref

Intro to Apache Kafka with Spring

profile
프론트도 조금 아는 짱구 같은 서버 프로그래머

0개의 댓글