Kafka 적용기

seokseungmin·2024년 11월 10일

Today I Learned

목록 보기
14/20
post-thumbnail

문제 정의

상황: 사용자가 자신의 MBTI를 수정하면, 이미 작성된 게시물의 MBTI 정보는 변경되지 않아 사용자 프로필과 게시물의 MBTI 정보가 일치하지 않는 문제가 발생합니다.
목표: 사용자가 MBTI를 변경할 때, 해당 사용자가 작성한 모든 게시물의 MBTI 정보도 자동으로 업데이트되도록 구현합니다.
MSA 적용으로 인해 user 모듈에 사용자 프로필 수정기능이 있고,
travel 모듈쪽에 여행 게시물 작성하는 기능이 분리 되어있어 발생한 문제.

전체 아키텍처 개요

이벤트 기반 아키텍처를 도입하여 모듈 간 데이터 동기화를 구현합니다.
Kafka를 활용하여 user 모듈과 travel 모듈 간의 통신을 처리합니다.
주요 흐름:
사용자가 MBTI를 변경하면 user 모듈에서 이벤트를 발행합니다.
travel 모듈에서 해당 이벤트를 수신하여 게시물의 MBTI 정보를 업데이트합니다.

zookeeper 실행 명령어

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

kafka 실행 명령어

bin\windows\kafka-server-start.bat config\server.properties

만약 Zookeeper가 timeout exceed 18000 이렇게 계속 떠서
구글링해보니 tmp-> kafka-logs에 log파일 전체를 다 지우고
Zookeeper를 띄우면 해결된다해서 따라해보니 정상적으로 서버 동작.

먼저 Zookeeper랑 Kafka 실행명령어로 서버 두개 띄워두고,
apigateway 모듈 서버 띄우고, travel모듈 서버 띄우고, user 모듈 띄우고 테스트 해봄.

구현 순서 및 상세 내용

3.1. Kafka 의존성 추가
먼저, 프로젝트에서 Kafka를 사용할 수 있도록 의존성을 추가합니다.
user와 travel모듈 양쪽에 설정.

spring-kafka: Kafka를 사용하기 위한 핵심 라이브러리입니다.
spring-kafka-test: Kafka 관련 테스트를 위한 라이브러리입니다.

dependencies {
    // 기존 의존성들...
    implementation 'org.springframework.kafka:spring-kafka'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

이벤트 클래스 생성

user 모듈과 travel 모듈에서 사용할 이벤트 클래스를 생성합니다. 두 모듈 모두에서 동일한 구조의 클래스를 사용해야 합니다.

UserMbtiChangedEvent.java

package com.zerobase.user.kafka;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserMbtiChangedEvent {
    private Long userId;
    private String newMbti;
}

userId: MBTI가 변경된 사용자 ID
newMbti: 변경된 MBTI 값
travel 모듈에서도 동일한 패키지 구조로 해당 클래스를 생성합니다.

package com.zerobase.travel.post.kafka;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserMbtiChangedEvent {
    private Long userId;
    private String newMbti;
}

user 모듈에서 이벤트 발행

1) Kafka 프로듀서 설정
Kafka로 이벤트를 발행하기 위해 프로듀서 설정을 추가합니다.

KafkaProducerConfig.java

package com.zerobase.user.config;

import com.zerobase.user.kafka.UserMbtiChangedEvent;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, UserMbtiChangedEvent> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 서버 주소
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 키 직렬화
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // 값 직렬화
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, UserMbtiChangedEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

이벤트 발행 로직 추가

사용자의 MBTI가 변경될 때 이벤트를 발행하도록 서비스 로직을 수정합니다.

ProfileService.java

package com.zerobase.user.service;

import com.zerobase.user.kafka.UserMbtiChangedEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@RequiredArgsConstructor
public class ProfileService {

    private final KafkaTemplate<String, UserMbtiChangedEvent> kafkaTemplate;

    @Value("${app.kafka.topics.user-mbti-changed}")
    private String userMbtiChangedTopic;

    @Transactional
    public void editProfile(ProfileRequestDTO profileRequest, UserEntity currentUser) {
        // 프로필 업데이트 로직...

        // MBTI 변경 여부 확인 및 이벤트 발행
        if (profileRequest.getMbti() != null) {
            String mbtiString = profileRequest.getMbti().toUpperCase();
            profile.setMbti(MBTI.valueOf(mbtiString));

            // 이벤트 발행
            UserMbtiChangedEvent event = new UserMbtiChangedEvent(currentUser.getId(), mbtiString);
            kafkaTemplate.send(userMbtiChangedTopic, event);
        }

        // 나머지 프로필 업데이트 로직...
    }
}

사용자의 MBTI가 변경되면 UserMbtiChangedEvent를 생성하여 Kafka 토픽에 발행합니다.

travel 모듈에서 이벤트 수신 및 처리

1) Kafka 컨슈머 설정
이벤트를 수신하기 위해 Kafka 컨슈머 설정을 추가합니다.

KafkaConsumerConfig.java

package com.zerobase.travel.post.config;

import com.zerobase.travel.post.kafka.UserMbtiChangedEvent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, UserMbtiChangedEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        // Kafka 브로커 주소
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        // 그룹 ID 설정
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "travel-service-group");

        // Key와 Value Deserializer 설정
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        // JsonDeserializer 설정
        JsonDeserializer<UserMbtiChangedEvent> valueDeserializer = new JsonDeserializer<>(UserMbtiChangedEvent.class);
        valueDeserializer.addTrustedPackages("*");

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), valueDeserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserMbtiChangedEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserMbtiChangedEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

이벤트 수신 및 처리 로직 추가
이벤트를 수신하여 게시물의 MBTI 정보를 업데이트하는 리스너를 생성합니다.

UserMbtiChangedEventListener.java

package com.zerobase.travel.post.service;

import com.zerobase.travel.post.kafka.UserMbtiChangedEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class UserMbtiChangedEventListener {

    private final PostService postService;

    @KafkaListener(topics = "${app.kafka.topics.user-mbti-changed}", groupId = "travel-service-group")
    public void handleUserMbtiChangedEvent(UserMbtiChangedEvent event) {
        Long userId = event.getUserId();
        String newMbti = event.getNewMbti();

        // 게시물의 MBTI 업데이트
        postService.updatePostsMbti(userId, newMbti);
    }
}

@KafkaListener를 사용하여 지정한 토픽의 메시지를 수신합니다.
수신한 이벤트의 정보를 이용하여 게시물의 MBTI를 업데이트합니다.

엔티티 및 리포지토리 수정

1) PostEntity에 MBTI 필드 추가
게시물 엔티티에 작성자의 MBTI 정보를 저장하기 위해 필드를 추가합니다.

PostEntity.java

package com.zerobase.travel.post.entity;

import com.zerobase.travel.post.type.MBTI;
import javax.persistence.*;

@Entity
public class PostEntity {
    // 기존 필드들...

    @Enumerated(EnumType.STRING)
    private MBTI mbti;

    // Getter, Setter...
}

리포지토리에 업데이트 메서드 추가
사용자 ID를 기반으로 게시물의 MBTI를 업데이트하는 메서드를 추가합니다.

PostRepository.java

package com.zerobase.travel.post.repository;

import com.zerobase.travel.post.entity.PostEntity;
import com.zerobase.travel.post.type.MBTI;
import org.springframework.data.jpa.repository.*;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;

@Repository
public interface PostRepository extends JpaRepository<PostEntity, Long> {

    @Modifying
    @Query("UPDATE PostEntity p SET p.mbti = :mbti WHERE p.userId = :userId")
    void updateMbtiByUserId(@Param("mbti") MBTI mbti, @Param("userId") Long userId);
}

서비스 로직에 업데이트 메서드 추가
게시물의 MBTI를 업데이트하는 서비스 로직을 추가합니다.

PostService.java

package com.zerobase.travel.post.service;

import com.zerobase.travel.post.repository.PostRepository;
import com.zerobase.travel.post.type.MBTI;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@RequiredArgsConstructor
public class PostService {

    private final PostRepository postRepository;

    @Transactional
    public void updatePostsMbti(Long userId, String newMbti) {
        MBTI mbtiEnum;
        try {
            mbtiEnum = MBTI.valueOf(newMbti.toUpperCase());
        } catch (IllegalArgumentException e) {
            // 잘못된 MBTI 값 예외 처리
            return;
        }

        postRepository.updateMbtiByUserId(mbtiEnum, userId);
    }
}

설정 파일 수정

Kafka 설정을 위한 프로퍼티를 추가합니다.

application.yml (user 모듈)

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

app:
  kafka:
    topics:
      user-mbti-changed: user-mbti-changed-topic

application.yml (travel 모듈)

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: travel-service-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: '*'

app:
  kafka:
    topics:
      user-mbti-changed: user-mbti-changed-topic

bootstrap-servers: Kafka 서버 주소를 설정합니다.
group-id: 컨슈머 그룹 ID를 설정합니다.
trusted.packages: 역직렬화 시 신뢰할 수 있는 패키지를 지정합니다.

작동 원리

1) 사용자 MBTI 변경
사용자가 프로필 수정 API를 호출하여 MBTI를 변경합니다.
ProfileService의 editProfile 메서드에서 MBTI 변경 여부를 확인합니다.
2) 이벤트 발행
MBTI가 변경되었을 경우, UserMbtiChangedEvent를 생성합니다.
KafkaTemplate을 이용하여 설정된 토픽에 이벤트를 발행합니다.
3) 이벤트 수신
travel 모듈의 UserMbtiChangedEventListener에서 해당 토픽의 이벤트를 수신합니다.
이벤트에 포함된 userId와 newMbti 정보를 추출합니다.
4) 게시물 MBTI 업데이트
PostService의 updatePostsMbti 메서드를 호출하여 해당 사용자의 모든 게시물의 MBTI 정보를 업데이트합니다.
PostRepository의 updateMbtiByUserId 메서드를 통해 일괄 업데이트를 수행합니다.
5) 데이터 일관성 유지
이를 통해 사용자 프로필과 게시물의 MBTI 정보가 일치하도록 데이터 일관성을 유지합니다.

마무리

이번 포스팅에서는 Kafka를 활용하여 사용자 MBTI 변경 시 게시물의 MBTI 정보도 자동으로 업데이트하는 기능을 구현하는 과정을 상세히 살펴보았습니다. 이벤트 기반 아키텍처를 도입함으로써 모듈 간 결합도를 낮추고 데이터 일관성을 유지할 수 있었습니다.

장점

확장성: 모듈 간 의존성을 줄여 향후 기능 확장이 용이합니다.
유지보수성: 이벤트 기반으로 동작하여 변경 사항이 발생해도 영향 범위가 최소화됩니다.
실시간 데이터 동기화: 이벤트를 통해 실시간으로 데이터가 동기화됩니다.

참고사항

실제 운영 환경에서는 Kafka 서버 설정과 보안 설정 등을 더욱 꼼꼼히 해야 합니다.
테스트 코드를 작성하여 이벤트 발행 및 수신이 제대로 이루어지는지 검증하는 것이 좋습니다.

profile

0개의 댓글