MSA - kafka (1)

주빈·2024년 1월 2일

MSA

목록 보기
4/8


오늘은 Kafka에 대해 기록해보자.

일단 Kafka에 대해서 말하기 전 내가 사용하려 했던 Event Driven에 대해 간단히 알아보자.

📘 Event-Driven

간단하게 분산된 시스템에서 이벤트를 생성(발행)하고 발행된 이벤트를 수신자에게 전송하는 구조로 수신자는 그 이벤트를 처리하는 방식의 아키텍처이다.

내가 구상한 방식은
Api gateway와 Service discovery는 기존과 같이 미적용
기존 프록시를 이용하여 웹서버 구성 후 로드밸런싱을 이용 및 라우팅
이벤트 드리븐과 CQRS를 병합하여 구성
CUD는 서비스간 통신 없이 Event 기반으로 kafka를 이용하여 데이터 동기화 시킨다.

Http 통신을 이용해서도 비동기 통신은 가능하지만 느슨한 결합 목적에 맞지 않고 속도와 가용성면에서 상당히 떨어지기에 대부분 이용하는 방식은 이벤트를 이용하는 방식이다.

[종류]
대기열 처리 방식 - ActiveMQ, RabbitMQ (동기 통신, 단일 통신, Blocking)
Pub-sub 메시지 방식 - Apache Kafka (Non-Blocking, 비동기 통신, 양방향 통신 가능)

📜 Kafka 작동 방식

1: CUD 발생 후 이벤트 발행(PUB)
2: 이벤트 구독 후 CUD 실행(SUB)
3: CUD 실패 시 이벤트 로그 추적 후 rollback 실행
4: rollback 수행

✏ kafka 실습

실습 환경 : Spring boot, Mybatis, PostgreSQL

  • Kafka 이용으로 PostgreSQL 동기화 작업 테스트를 해보았다.
    전에 테스트 해본 Debezium이나 nifi의 어려움이 많아 MSA 환경에서 제일 많이 사용하는 Kafka를 이용하여 데이터 동기화를 진행했다.
    Spring boot 환경에서 topic 이용으로 Pub 서비스에서 보낸 정보로 Sub 서비스의 PostgreSQL에도 똑같이 동기화 하는 작업에 성공했다.
    하지만 타 서비스에도 동일한 mapper 파일을 만들어서 동기화 해야 한다는 점이
    아쉬웠다.
@Configuration
public class KafkaProducerConfig {
	
	@Bean
	public ProducerFactory<String, User> producerFactory() {
		
		Map<String, Object> properties = new HashMap<>();
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
		
		return new DefaultKafkaProducerFactory<>(properties);
	}
	
	@Bean
	public KafkaTemplate<String, User> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}

}

Producer Config 설정

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
	
	@Bean
	public ConsumerFactory<String, User> consumerFactory() {
		
		JsonDeserializer<User> deserializer = new JsonDeserializer<>(User.class);
		deserializer.setRemoveTypeHeaders(false);
		deserializer.addTrustedPackages("*");
		deserializer.setUseTypeMapperForKey(true);
		
		Map<String, Object> properties = new HashMap<>();
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
		
	    return new DefaultKafkaConsumerFactory<>(properties, new StringDeserializer(), deserializer);
	}
	
	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {

		ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		
		return factory;
	}

}

Consumer Config 설정

@Slf4j
@Component
public class KafkaProducer {
	
	@Autowired
	private KafkaTemplate<String, User> kafkaTemplate;
	
	public void publish(User user) {
		log.info("Kafka Producer sent data from the Prod MS: " + user.toString());
		kafkaTemplate.send("consumerGroupId", user);
	}
}

이후 Producer 쪽에서 이벤트를 발행하게 되면

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
	
	@Autowired
	private UserService userService;
	
	@KafkaListener(topics = "consumerGroupId", containerFactory = "kafkaListenerContainerFactory")
	public void userListener(User user) {
		
		try {
			log.info("▶▶▶▶▶▶▶▶ RECEIVED DATA FROM KAFKA: " + user.toString());
			userService.updateUser(user.getUserId());
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
}

Consumer 쪽에서 발행된 이벤트를 전달 받게 된다.

설정과 진행 방식이 복잡하지만 오픈 소스 중에선 제일 검증이 된 Kafka를 사용하는 것이 좋을 것 같았다.

profile
누구에게나 필요한 개발자가 꿈

0개의 댓글