일단, 새롭게 추가된 도메인이 하나 있다. Group이라는 도메인인데, 사용자(user)가 속하게되는 그룹을 말한다. 사용자는 초대를 받아 그룹에 속하거나 새롭게 그룹을 만들 수 있다.
여기서 만약, 회원이 우리 서비스를 탈퇴하게 되었을 경우, 속한 group에도 회원이 사라져야한다. 이전처럼 feign을 이용하여 http 통신으로 구현할 수도 있겠지만, 별도의 응답이 필요없고 변경된 사항이 빠르게 반영되어야 하므로 카프카를 사용하게 되었다.
회원가입과 탈퇴를 담당하는 마이크로서비스, join service에서 사용자의 탈퇴 요청을 받고, 다시 user service로 요청을 보내 user 테이블에서 데이터가 최종적으로 삭제된다. user service는 탈퇴한 user의 id로 메시지를 만들어 deleted-user
토픽에 발행하게 된다. group service는 해당 토픽을 구독하고 있는 상태이므로 들어온 메시지를 consume 하여 group 테이블에도 반영되도록 하였다.
@EnableKafka
어노테이션을 추가해준다. 카프카 서버는 개발용도로 local에 띄워둔 서버를 사용했고 key, value가 String 타입이므로 StringSerializer를 추가해주었다. @Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper mapper;
private final List<Field> fields = Collections.singletonList(new Field(FieldType.STRING, true, "userId"));
private final Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("users")
.build();
public Long send(String topic, Long userId) {
Payload payload = Payload.builder()
.userId(userId)
.build();
KafkaUserDto userDto = KafkaUserDto.builder()
.schema(schema)
.payload(payload)
.build();
String data = null;
try {
data = mapper.writeValueAsString(userDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, data);
log.info("[Kafka Producer SENT data - User service] : " + userDto);
return userId;
}
}
아래와 같이 메시지가 생성될 것이다.
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final GroupRepository groupRepository;
private final GroupUsersRepository groupUsersRepository;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "deleted-user")
@Transactional
public void deleteUserInGroup(String kafkaMessage) throws JsonProcessingException {
Map<Object, Object> map = new HashMap<>();
JsonNode jsonNode = objectMapper.readTree(kafkaMessage);
Long userId = jsonNode.get("payload").get("userId").longValue();
GroupUsers groupUser = groupUsersRepository.findByUserId(userId);
if (groupUser.getGroupEntity() != null) {
groupUsersRepository.delete(groupUser);
GroupEntity group = groupUser.getGroupEntity();
if (groupUsersRepository.countByGroupEntity(group) == 0) {
groupRepository.delete(group);
}
}
}
}
예전에 한번 카프카가 어떤 것인지 배운적이 있었는데 실제 프로젝트에 적용해본 것은 이번이 처음이다! 카프카는 이 밖에도 여러 곳에서 쓰인다는데, 책 한권 사서 내부 동작과 현업에는 어떻게 사용되는지 깊이있게 배워보고 나중에 개발할 때에도 적절한 곳에 카프카를 잘 써보고 싶다.