마이크로서비스 아키텍쳐 환경에서 서로 다른 도메인들간 분산 이벤트 처리를 위해 Kafka를 사용하는 경우가 많은데, 실제 운영하는 Kafka를 연동한 후 테스트를 진행하는 경우를 종종 목격한 적이 있습니다.
이는 외부 afka에 의존해 테스트를 진행하는 것이기 때문에 테스트의 안정성을 떨어뜨릴 수 있고,
외부 Kafka의 문제로 인해 테스트 구동이 제한될 수도 있다고 생각합니다.
Spring Boot에서 외부 Kafka 서버에 의존하지 않는 안정적이고 독립적인 통합 테스트하는 방법에 대해서 알려드리고자 합니다.
우선 Spring에서 Kafka를 사용하기 위해 Kafka 관련 의존성을 추가해줍니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
그 후 Kafka Test 관련 의존성을 추가해줍니다.
이때 테스트 환경에서만 의존성을 유지시킬 것이기 때문에 scope를 test로 세팅해줍니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.3.RELEASE</version>
<scope>test</scope>
</dependency>
예시 코드는 총 [6개]입니다.
위 코드를 기준으로 테스트 코드를 작성해보았습니다.
테스트 코드의 결과는 성공적으로 수행됩니다.
application.properties
------------------------------------------------------
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
bootstrap-servers: localhost:9092
enable-auto-commit: false
listener:
ack-mode: manual
KafkaProducerConfig.class
------------------------------------------------------
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String BOOTSTRAP_SERVERS;
@Bean
public ProducerFactory<String, String> factory(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(factory());
}
}
KafkaConsumerConfig.class
------------------------------------------------------
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String BOOTSTRAP_ADDRESS;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String AUTO_OFFSET_RESET;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean AUTO_COMMIT;
@Bean
ConsumerFactory<String,String> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaProducer.class
------------------------------------------------------
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
kafkaTemplate.send(topic, payload);
}
}
KafkaConsumer.class
------------------------------------------------------
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {
private final ObjectMapper objectMapper;
private List<RegisteredPostEvent> eventRepo = new ArrayList<>();
@KafkaListener(topics = "testTopic", groupId = "testGroup")
protected void consume(@Payload String payload, Acknowledgment acknowledgment) throws Exception {
log.info("recive event : {}", payload);
RegisteredPostEvent event = objectMapper.readValue(payload, RegisteredPostEvent.class);
eventRepo.add(event);
// Process
acknowledgment.acknowledge();
}
public List<RegisteredPostEvent> getEventRepo() {
return eventRepo;
}
}
EmbeddedKafkaTest.class
------------------------------------------------------
@SpringBootTest
@EmbeddedKafka(partitions = 3,
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092"
},
ports = { 9092 })
class EmbeddedKafkaIntegrationTest {
@Autowired
KafkaProducer producer;
@Autowired
ObjectMapper objectMapper;
@Autowired
private KafkaConsumer kafkaConsumer;
@Test
void test() throws Exception {
// given
RegisteredPostEvent event = RegisteredPostEvent.idOf(1L);
String payload = objectMapper.writeValueAsString(event);
// when
producer.send("testTopic", payload);
Thread.sleep(2000);
// then
assertNotEquals(0, kafkaConsumer.getEventRepo().size());
}
}
위 코드중에 주제에 맞는 가장 핵심이 되는 부분은 EmbeddedKafkaIntegrationTest 클래스에 선언되어 있는 EmbeddedKafka 어노테이션입니다.
위 어노테이션은 spring-kafka-test에 존재하는 어노테이션으로 해당 어노테이션을 클래스 레벨에 선언함으로써 해당 테스트는 Embedded Kafka가 세팅된 환경에서 테스트하게 됩니다.
https://forum.ixbt.com/users.cgi?id=info:%3E1889367