
오늘은 Kafka에 대해 기록해보자.
일단 Kafka에 대해서 말하기 전 내가 사용하려 했던 Event Driven에 대해 간단히 알아보자.
간단하게 분산된 시스템에서 이벤트를 생성(발행)하고 발행된 이벤트를 수신자에게 전송하는 구조로 수신자는 그 이벤트를 처리하는 방식의 아키텍처이다.
내가 구상한 방식은
Api gateway와 Service discovery는 기존과 같이 미적용
기존 프록시를 이용하여 웹서버 구성 후 로드밸런싱을 이용 및 라우팅
이벤트 드리븐과 CQRS를 병합하여 구성
CUD는 서비스간 통신 없이 Event 기반으로 kafka를 이용하여 데이터 동기화 시킨다.
Http 통신을 이용해서도 비동기 통신은 가능하지만 느슨한 결합 목적에 맞지 않고 속도와 가용성면에서 상당히 떨어지기에 대부분 이용하는 방식은 이벤트를 이용하는 방식이다.
[종류]
대기열 처리 방식 - ActiveMQ, RabbitMQ (동기 통신, 단일 통신, Blocking)
Pub-sub 메시지 방식 - Apache Kafka (Non-Blocking, 비동기 통신, 양방향 통신 가능)

1: CUD 발생 후 이벤트 발행(PUB)
2: 이벤트 구독 후 CUD 실행(SUB)
3: CUD 실패 시 이벤트 로그 추적 후 rollback 실행
4: rollback 수행
실습 환경 : Spring boot, Mybatis, PostgreSQL
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, User> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Producer Config 설정
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, User> consumerFactory() {
JsonDeserializer<User> deserializer = new JsonDeserializer<>(User.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return new DefaultKafkaConsumerFactory<>(properties, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Consumer Config 설정
@Slf4j
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
public void publish(User user) {
log.info("Kafka Producer sent data from the Prod MS: " + user.toString());
kafkaTemplate.send("consumerGroupId", user);
}
}
이후 Producer 쪽에서 이벤트를 발행하게 되면
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
@Autowired
private UserService userService;
@KafkaListener(topics = "consumerGroupId", containerFactory = "kafkaListenerContainerFactory")
public void userListener(User user) {
try {
log.info("▶▶▶▶▶▶▶▶ RECEIVED DATA FROM KAFKA: " + user.toString());
userService.updateUser(user.getUserId());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer 쪽에서 발행된 이벤트를 전달 받게 된다.
설정과 진행 방식이 복잡하지만 오픈 소스 중에선 제일 검증이 된 Kafka를 사용하는 것이 좋을 것 같았다.