Spring 과 Kafka를 연동하는 방법은 크게 두가지가 존재합니다.
이번 시간에는 spring-kafka 를 활용하여 kafka의 pub/sub를 구현해보며 pub,sub 시 발생할 수 있는 휴먼에러를 방지하는 코드를 작성해 봅시다. (이 예제는 기본적인 구현이며, 실제 프로젝트에서는 에러 처리, 재시도, 트랜잭션 등 추가적인 고려 사항이 필요합니다.)
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka' // spring-kafka
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
public String bootstrapAddress;
// Producer (Pub) 설정
@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG,
"all"); // 가장 안전하지만 가장 느린 설정입니다. 모든 ISR (In-Sync Replicas, 동기화된 복제본)에 레코드가 복제될 때까지 기다립니다. ISR 중 하나라도 살아있다면 레코드는 유실되지 않습니다.
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);//재전송 횟수
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);//재전송 간격
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
return new KafkaTemplate<>(multiTypeProducerFactory());
}
...
}
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
public String bootstrapAddress;
...
// Consumer (sub) 설정
// Consumer에서 자동 형변환을 해주기 위한 설정
@Bean
public RecordMessageConverter multiTypeConverter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("*");
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("kafkaOrderCreatedPayload", KafkaOrderCreatedPayload.class);
mappings.put("kafkaOrderCancelPayload", KafkaOrderCancelPayload.class);
typeMapper.setIdClassMapping(mappings);
converter.setTypeMapper(typeMapper);
return converter;
}
@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setRecordMessageConverter(multiTypeConverter());
return factory;
}
}
코드에 key, topic을 명시하여 휴먼 에러를 방지합니다.
public class KafkaTopicName {
public static final String ORDER_CREATED = "order-created.v1";
public static final String ORDER_CANCELED = "order-canceled.v1";
}
public interface KafkaBasePayload {
// get 이 prefix로 지정 되어 있지 않는 경우 key, topic도 Value에 포함되지 않음
String key();
String topic();
// get이 prefix로 지정될 경우 key, topic도 Value에 포함되어 Kafka에 전송됨
// String getKey();
// String getTopic();
}
public record KafkaOrderCancelPayload(
int orderId
) implements KafkaBasePayload {
@Override
public String key() {
return String.valueOf(this.orderId);
}
@Override
public String topic() {
return KafkaTopicName.ORDER_CANCELED;
}
}
public record KafkaOrderCreatedPayload(
int orderId,
int itemId
) implements KafkaBasePayload {
@Override
public String key() {
return String.valueOf(this.orderId);
}
@Override
public String topic() {
return KafkaTopicName.ORDER_CREATED;
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaMultiProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public <T extends KafkaBasePayload> void publish(T payload) {
kafkaTemplate.send(payload.topic(), payload.key(), payload);
log.info("kafka message published. topic: {}, key: {}, body: {}", payload.topic(),
payload.key(), payload);
}
}
@Component
@Slf4j
@KafkaListener(
id = "orderCanceled",
topics = {KafkaTopicName.ORDER_CANCELED}
)
public class KafkaOrderCanceledConsumer {
@KafkaHandler
public void consumer(KafkaOrderCancelPayload kafkaOrderCancelPayload) {
log.info("multiGroup Received: " + kafkaOrderCancelPayload);
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
log.error("multiGroup Received unknown: " + object);
}
}
@Component
@Slf4j
@KafkaListener(
id = "orderCreated",
topics = {KafkaTopicName.ORDER_CREATED}
)
public class KafkaOrderCreatedConsumer {
@KafkaHandler
public void consumer(KafkaOrderCreatedPayload kafkaOrderCreatedPayload) {
log.info("multiGroup Received: " + kafkaOrderCreatedPayload);
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
log.error("multiGroup Received unknown: " + object);
}
}
@RestController
@RequestMapping("/multi")
@RequiredArgsConstructor
public class SimpleMultiController {
private final KafkaMultiProducer kafkaMultiProducer;
@GetMapping("/create/{orderId}/{itemId}")
public ResponseEntity<String> create(@PathVariable("orderId") int orderId, @PathVariable("itemId") int itemId) {
KafkaOrderCreatedPayload payload = new KafkaOrderCreatedPayload(orderId, itemId);
kafkaMultiProducer.publish(payload);
return ResponseEntity.ok("ok");
}
@GetMapping("/cancel/{orderId}")
public ResponseEntity<String> cancel(@PathVariable("orderId") int orderId) {
KafkaOrderCancelPayload payload = new KafkaOrderCancelPayload(orderId);
kafkaMultiProducer.publish(payload);
return ResponseEntity.ok("ok");
}
}
@SpringBootTest
@EmbeddedKafka(partitions = 5,
topics = {
KafkaTopicName.ORDER_CREATED})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@DisplayName(" Kafka test : ** 대기 시간이 존재하여 오래 걸림")
public class KafkaOrderCreatedConsumerTest {
@Autowired
KafkaTemplate<String, Object> template;
@Autowired
KafkaMultiProducer producer;
@SpyBean
KafkaOrderCreatedConsumer consumer;
@Captor
ArgumentCaptor<KafkaOrderCreatedPayload> kafkaOrderCreatedPayloadArgumentCaptor;
@Captor
ArgumentCaptor<Object> unknownCaptor;
// Topic, partition, offset을 사용하며 captor 할때 사용
// @Captor
// ArgumentCaptor<String> topicArgumentCaptor;
//
// @Captor
// ArgumentCaptor<Integer> partitionArgumentCaptor;
//
// @Captor
// ArgumentCaptor<Long> offsetArgumentCaptor;
@Nested
@DisplayName("KafkaTopic.ORDER_CREATED 테스트")
class KafkaTopic_Name_ORDER_CREATED {
@Test
@DisplayName("정상 처리")
public void success()
throws Exception {
int orderId = 1;
int itemId = 2;
KafkaOrderCreatedPayload sendPayload = new KafkaOrderCreatedPayload(orderId, itemId);
producer.publish(sendPayload);
verify(consumer, timeout(3000).times(1))
.consumer(kafkaOrderCreatedPayloadArgumentCaptor.capture());
KafkaOrderCreatedPayload kafkaOrderCreatedPayload = kafkaOrderCreatedPayloadArgumentCaptor.getValue();
assertEquals(kafkaOrderCreatedPayload.orderId(), 1);
assertEquals(kafkaOrderCreatedPayload.itemId(), itemId);
}
}
}