[์‹๊ตฌํ•˜์ž_MSA] ๐Ÿš€ ์ฑ„ํŒ…&์•Œ๋ฆผ ๋งˆ์ดํฌ๋กœ ์„œ๋น„์Šค: ์ฑ„ํŒ… ๊ธฐ๋Šฅ ๊ฐœ๋ฐœ(Stomp, Kafka, MongoDB) - 2ํŽธ

์ด๋ฏผ์šฐยท2024๋…„ 3์›” 15์ผ
4

๐Ÿ€ ์‹๊ตฌํ•˜์ž_MSA

๋ชฉ๋ก ๋ณด๊ธฐ
5/21

๋ชฉ์ฐจ

  1. ์†Œ๊ฐœ
  2. Eureka Client ๋“ฑ๋ก
  3. Kafka, Zookeeper, MongoDB ์„ค์น˜์™€ ์„ธํŒ…
  4. Spring ์„ธํŒ…
  5. ๋„๋ฉ”์ธ ๋ชจ๋ธ
  6. ์„œ๋น„์Šค ๋กœ์ง
  7. ์ปจํŠธ๋กค๋Ÿฌ
  8. ๋งˆ๋ฌด๋ฆฌ

1. ์†Œ๊ฐœ


์ง€๋‚œ ํฌ์ŠคํŒ…์— ์ด์–ด ์ฑ„ํŒ… ๋งˆ์ดํฌ๋กœ ์„œ๋น„์Šค ๊ฐœ๋ฐœ ๊ณผ์ •์„ ํฌ์ŠคํŒ… ํ•˜๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. ์ฃผ์š” ์ฝ”๋“œ๋งŒ ๋‹ค๋ฃฐ ์˜ˆ์ •์ด๋ผ ๋‹ค๋ฃจ์ง€ ์•Š๋Š” ์ฝ”๋“œ๋“ค์€ ์•„๋ž˜ github๋ฅผ ํ†ตํ•ด ์ฐธ๊ณ ํ•˜์‹œ๊ธธ ๋ฐ”๋ž๋‹ˆ๋‹ค๐Ÿ™๐Ÿ™(msa-master ๋ธŒ๋žœ์น˜)

+) Spring Cloud๋ฅผ ํ†ตํ•œ MSA๋ฅผ ๊ตฌ์ถ•ํ•˜์‹œ๋Š” ๋ถ„์ด ์•„๋‹ˆ๋ผ๋ฉด ๋ชฉ์ฐจ 3๋ฒˆ๋ถ€ํ„ฐ ์ง„ํ–‰ํ•˜์‹œ๋ฉด ๋ฉ๋‹ˆ๋‹ค!

๐Ÿ‘‰ ์ฐธ๊ณ  ์ฝ”๋“œ : https://github.com/LminWoo99/PlantBackend/tree/msa-master

2. Eureka Client ๋“ฑ๋ก


1๏ธโƒฃ ์˜์กด์„ฑ ์ถ”๊ฐ€

implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
ext {
    set('springCloudVersion', "2021.0.4")
}

2๏ธโƒฃ yml ์ž‘์„ฑ

server:
  port: 0
spring:
  application:
    name: [์„œ๋น„์Šค ๋ช…]
eureka:
  instance:
    prefer-ip-address: true
    instance-id: ${spring.application.name}:${spring.application.instance_id:${random.value}}
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://[๋„๋ฉ”์ธ]:8761/eureka

3๏ธโƒฃ @EurekaClient ๋“ฑ๋ก

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@EnableJpaAuditing
@EnableWebMvc
public class PlantChatServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(PlantChatServiceApplication.class, args);
    }

}

3. Kafka, Zookeeper, MongoDB ์„ค์น˜์™€ ์„ธํŒ…


3.1. docker-compose.yml ์ž‘์„ฑ

version: '3'
networks:
  plant-network:
    external: true
    name: plant-network

services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - plant-network
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
    networks:
      - plant-network
  mongo:
    image: mongo:latest
    container_name: mongo
    ports:
      - "27017:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME: [์œ ์ €name]
      MONGO_INITDB_ROOT_PASSWORD: [์œ ์ € password]
    networks:
      - plant-network

docker-compose.yml ํŒŒ์ผ์— ์œ„์— ์ž‘์„ฑ๋œ ์ฝ”๋“œ๋ฅผ ๋ถ™์—ฌ ๋„ฃ์–ด ์ฃผ์‹œ๊ณ  ํ”„๋กœ์ ํŠธ ์ตœ์ƒ๋‹จ ์œ„์น˜์—์„œ ์•„๋ž˜์˜ ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•ด์ค๋‹ˆ๋‹ค.

docker-compose up -d

๐Ÿšจ ์ฃผ์˜์‚ฌํ•ญ!

MongoDB ๊ตฌ์ถ• ์‹œ username๊ณผ password๋ฅผ ์„ค์ •ํ•˜์ง€ ์•Š์œผ๋ฉด ์‹ฌ๊ฐํ•œ ๋ณด์•ˆ ์ด์Šˆ๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ตœ๊ทผ ๋งŽ์€ ์‚ฌ๋ก€์—์„œ ๋ณด์•ˆ์ด ์ทจ์•ฝํ•œ MongoDB ์ธ์Šคํ„ด์Šค๊ฐ€ ๋žœ์„ฌ์›จ์–ด ๊ณต๊ฒฉ์˜ ๋Œ€์ƒ์ด ๋˜์–ด ๋ฐ์ดํ„ฐ๊ฐ€ ์•”ํ˜ธํ™”๋˜๊ฑฐ๋‚˜ ์‚ญ์ œ๋˜๋Š” ์‚ฌ๊ณ ๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค. ์ด๋ฅผ ์˜ˆ๋ฐฉํ•˜๊ธฐ ์œ„ํ•ด ๋ฐ˜๋“œ์‹œ ๊ฐ•๋ ฅํ•œ ์ธ์ฆ ์ •๋ณด๋ฅผ ์„ค์ •ํ•˜๊ณ , ํ•„์š”ํ•œ ๊ฒฝ์šฐ์—๋งŒ ์™ธ๋ถ€์—์„œ ์ ‘๊ทผ ๊ฐ€๋Šฅํ•˜๋„๋ก ๋„คํŠธ์›Œํฌ ์„ค์ •์„ ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

3.2. Zookeeper ์„ค์ •

  • zookeeper docker ์ ‘์†
     docker exec -it zookeeper bash
    
  • Zookeeper ์„œ๋ฒ„๋ฅผ ์‹คํ–‰
    bash bin/zkServer.sh start /opt/zookeeper-3.4.13/conf/zoo.cfg -demon
    

3.3. Kafka ์„ค์ •

  • Kafka Docker ์ ‘์†
docker exec -it kafka bash
  • Kafka ์„œ๋ฒ„๋ฅผ ์‹คํ–‰
kafka-server-start.sh -daemon

Kafka ์„œ๋ฒ„ ์„ค์ •๊นŒ์ง€ ์™„๋ฃŒ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

Kafka ํ† ํ”ฝ ์„ค์ • ๋ฐ ํ…Œ์ŠคํŠธ

Kafka ์„œ๋ฒ„ ์„ค์ •์ด ์™„๋ฃŒ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์ด์ œ Kafka์˜ ํ•ต์‹ฌ ๊ฐœ๋…์ธ Topic์— ๋Œ€ํ•ด ์•Œ์•„๋ณด๊ณ , ํ…Œ์ŠคํŠธ๋ฅผ ์ง„ํ–‰ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

Kafka Topic ์ดํ•ดํ•˜๊ธฐ

Kafka์—์„œ Topic์€ ๋ฉ”์‹œ์ง€๋ฅผ ๊ตฌ๋ถ„ํ•˜๋Š” ๋‹จ์œ„์ž…๋‹ˆ๋‹ค. pub/sub ๋ชจ๋ธ์„ ํ†ตํ•ด ๋ฉ”์‹œ์ง€์˜ ์ƒ์‚ฐ๊ณผ ์†Œ๋น„๊ฐ€ ์ด๋ฃจ์–ด์ง€๋ฉฐ, ์ด๋ฅผ ์œ„ํ•ด์„œ๋Š” Topic์„ ๋จผ์ € ์ƒ์„ฑํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๐Ÿ’ก์ฐธ๊ณ : ์‹ค์ œ ์ฑ„ํŒ… ๊ธฐ๋Šฅ์— ์‚ฌ์šฉํ•  Topic์€ Spring ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ์ž๋™์œผ๋กœ ์ƒ์„ฑํ•  ์˜ˆ์ •์ž…๋‹ˆ๋‹ค.

ํ…Œ์ŠคํŠธ Topic ์ƒ์„ฑ ๋ฐ ํ™•์ธ

Kafka๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ์ž‘๋™ํ•˜๋Š”์ง€ ํ™•์ธํ•˜๊ธฐ ์œ„ํ•ด ํ…Œ์ŠคํŠธ์šฉ Topic์„ ์ƒ์„ฑํ•ด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

  1. Kafka Bash์— ์ ‘์†ํ•ฉ๋‹ˆ๋‹ค.
  2. ๋‹ค์Œ ๋ช…๋ น์–ด๋กœ ํ…Œ์ŠคํŠธ Topic์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค:
kafka-topics.sh --create --zookeeper zookeeper:2181 --topic test-topic --partitions 1 --replication-factor 1
  1. Topic์ด ์ž˜ ์ƒ์„ฑ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค:
kafka-topics.sh --list --zookeeper zookeeper

์ด ๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•˜๋ฉด test-topic์ด ๋ชฉ๋ก์— ํ‘œ์‹œ๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๐Ÿš€ Tip: ์‹ค์ œ ํ”„๋กœ๋•์…˜ ํ™˜๊ฒฝ์—์„œ๋Š” ๋ณด์•ˆ๊ณผ ์„ฑ๋Šฅ์„ ๊ณ ๋ คํ•˜์—ฌ Topic์˜ ํŒŒํ‹ฐ์…˜ ์ˆ˜์™€ ๋ณต์ œ ํŒฉํ„ฐ๋ฅผ ์ ์ ˆํžˆ ์กฐ์ •ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

4. Spring ์„ธํŒ…


4.1. Kafka ์„ค์ •

Kafka ์ปจ์Šˆ๋จธ ์„ค์ •

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value("${kafka.url}")
    private String kafkaServerUrl;

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, Message> consumerFactory() {
        JsonDeserializer<ChatMessage> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("*");
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "chat");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
    }
}

Kafka ํ”„๋กœ๋“€์„œ ์„ค์ •

@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Value("${kafka.url}")
    private String kafkaServerUrl;
    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigurations());
    }

   // Kafka Producer ๊ตฌ์„ฑ์„ ์œ„ํ•œ ์„ค์ •๊ฐ’๋“ค์„ ํฌํ•จํ•œ ๋งต์„ ๋ฐ˜ํ™˜ํ•˜๋Š” ๋ฉ”์„œ๋“œ
    @Bean
    public Map<String, Object> producerConfigurations() {
        return ImmutableMap.<String, Object>builder()
                .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl)
                .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
                //๋ฉฑ๋“ฑ์„ฑ ํ”„๋กœ๋“€์„œ ๋ช…์‹œ์  ์„ค์ •
                .put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
                .build();
    }
}

4.2. MongoDB ์„ค์ •

mongodb:
  client: mongodb://{ip์ฃผ์†Œ}:27017
  name : {db ์ด๋ฆ„}

์„ค์ • ํด๋ž˜์Šค์— ์‚ฌ์šฉํ•  properties ํŒŒ์ผ์„ ์ž‘์„ฑ

@Data
@Component
@ConfigurationProperties(prefix = "mongo-db")
public class MongoDbProperties {
    private String connectionString;
    private String databaseName;
}

MongoConfig ํด๋ž˜์Šค ์ž‘์„ฑ

// MongoDB ์„ค์ • ํด๋ž˜์Šค
@Configuration
@RequiredArgsConstructor
@EnableMongoRepositories(basePackages = "com.example.plantchatservice.repository.mongo")
public class MongoDbConfiguration {
    private final MongoDbSettings mongoDbSettings;

    // MongoDB ํด๋ผ์ด์–ธํŠธ ๋นˆ ์ƒ์„ฑ
    @Bean
    public MongoClient createMongoClient() {
        return MongoClients.create(mongoDbSettings.getConnectionString());
    }

    // MongoTemplate ๋นˆ ์ƒ์„ฑ
    @Bean
    public MongoTemplate createMongoTemplate() {
        return new MongoTemplate(createMongoClient(), mongoDbSettings.getDatabaseName());
    }
}

4.3. WebSocket ์„ค์ •

@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    private final StompHandler stompHandler;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat")
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/subscribe");
        registry.setApplicationDestinationPrefixes("/publish");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(stompHandler);
    }
}

์›น์†Œ์ผ“ ์„ค์ • ์ •๋ณด๋Š” ์ค‘์š”ํ•˜๋‹ˆ ๋ฉ”์„œ๋“œ๋ณ„๋กœ ์ž์„ธํžˆ ์•Œ์•„๋ณด๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค!

1. STOMP ์—”๋“œํฌ์ธํŠธ ๋“ฑ๋ก

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/chat")
            .setAllowedOriginPatterns("*")
            .withSockJS();
}
  • ์—”๋“œํฌ์ธํŠธ: /chat
  • ๋ชฉ์ : ํ”„๋ก ํŠธ์—”๋“œ์—์„œ STOMP ์—ฐ๊ฒฐ ์‹œ๋„ ์‹œ ์‚ฌ์šฉํ•  ์—”๋“œํฌ์ธํŠธ ์ง€์ •
  • โš ๏ธ ์ฃผ์˜์‚ฌํ•ญ:
    • setAllowedOriginPatterns("*"): ๋ชจ๋“  ์˜ค๋ฆฌ์ง„ ํ—ˆ์šฉ (๊ฐœ๋ฐœ ํ™˜๊ฒฝ)
    • ํ”„๋กœ๋•์…˜ ํ™˜๊ฒฝ์—์„œ๋Š” ๋ณด์•ˆ์„ ์œ„ํ•ด ํŠน์ • ์˜ค๋ฆฌ์ง„๋งŒ ํ—ˆ์šฉํ•ด์•ผ ํ•จ
  • withSockJS(): SockJS ์ง€์› ํ™œ์„ฑํ™”

2. ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค ๊ตฌ์„ฑ

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableSimpleBroker("/subscribe");
    registry.setApplicationDestinationPrefixes("/publish");
}
  • ๊ตฌ๋… ์ ‘๋‘์‚ฌ: /subscribe
    • ์‚ฌ์šฉ ์˜ˆ: /subscribe/{chatNo} (์ฑ„ํŒ…๋ฐฉ ๋ฒˆํ˜ธ๋กœ ๊ตฌ๋…)
  • ๋ฉ”์‹œ์ง€ ์ „์†ก ์ ‘๋‘์‚ฌ: /publish
    • ์‚ฌ์šฉ ์˜ˆ: /publish/message (๋ฉ”์‹œ์ง€ ์ „์†ก ์ปจํŠธ๋กค๋Ÿฌ๋กœ ๋ผ์šฐํŒ…)

3. ํด๋ผ์ด์–ธํŠธ ์ธ๋ฐ”์šด๋“œ ์ฑ„๋„ ๊ตฌ์„ฑ


@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.interceptors(stompHandler);
}
  • ๋ชฉ์ : STOMP ๋ฉ”์‹œ์ง€ ํ•ธ๋“ค๋ง
  • stompHandler๋ฅผ ์ธํ„ฐ์…‰ํ„ฐ๋กœ ๋“ฑ๋ก
    • ๋“ค์–ด์˜ค๋Š” STOMP ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ
    • ํ•„์š”์‹œ ์ถ”๊ฐ€์ ์ธ ๋กœ์ง ๊ตฌํ˜„ ๊ฐ€๋Šฅ (์˜ˆ: ์ธ์ฆ, ๋กœ๊น… ๋“ฑ)

๐Ÿ’ก ํ•ต์‹ฌ ํฌ์ธํŠธ

  1. ์—”๋“œํฌ์ธํŠธ: /chat์„ ํ†ตํ•ด WebSocket ์—ฐ๊ฒฐ
  2. ๊ตฌ๋…: /subscribe/{chatNo}๋กœ ํŠน์ • ์ฑ„ํŒ…๋ฐฉ ๊ตฌ๋…
  3. ๋ฐœํ–‰: /publish/message๋กœ ๋ฉ”์‹œ์ง€ ์ „์†ก
  4. ๋ณด์•ˆ: StompHandler๋ฅผ ํ†ตํ•œ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ๋ฐ ์ถ”๊ฐ€ ๋กœ์ง ๊ตฌํ˜„ ๊ฐ€๋Šฅ

์ด ์„ค์ •์„ ํ†ตํ•ด ์ฑ„ํŒ…๋ฐฉ๋ณ„๋กœ ํšจ๊ณผ์ ์ธ ๋ฉ”์‹œ์ง€ ๊ด€๋ฆฌ ๋ฐ ์ „๋‹ฌ์ด ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.

5. ๋„๋ฉ”์ธ ๋ชจ๋ธ


MongoDB์—์„œ ๋ฉ”์‹œ์ง€ ์ €์žฅ์— ์‚ฌ์šฉํ•  ๋„๋ฉ”์ธ (์ฑ„ํŒ… ๋‚ด์—ญ ๋ฐ์ดํ„ฐ)

@Document(collection="chatting")
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Chatting {
    @Id
    private String id;
    private Integer chatRoomNo;
    private Integer senderNo;
    private String senderName;
    private String contentType;
    private String content;
    private LocalDateTime sendDate;
    private long readCount;

    @Builder
    public Chatting(String id, Integer chatRoomNo, Integer senderNo, String senderName, String contentType, String content, LocalDateTime sendDate, long readCount) {
        this.id = id;
        this.chatRoomNo = chatRoomNo;
        this.senderNo = senderNo;
        this.senderName = senderName;
        this.contentType = contentType;
        this.content = content;
        this.sendDate = sendDate;
        this.readCount = readCount;
    }
}

Kafka์—์„œ ๋ฉ”์‹œ์ง€ ์ „๋‹ฌ์— ์‚ฌ์šฉํ•  ๋„๋ฉ”์ธ ๋ชจ๋ธ

+) ์ถ”๊ฐ€๋กœ msa๋ฅผ ๊ตฌ์ถ•ํ•˜์‹œ๋Š” ๋ถ„๋“ค์€ ์›น์†Œ์ผ“์€ ws ํ”„๋กœํ† ์ฝœ์„ ์‚ฌ์šฉํ•˜๊ธฐ ๋–„๋ฌธ์— gateway yml์— ์•„๋ž˜ ์ฝ”๋“œ๋ฅผ ์ถ”๊ฐ€ํ•ด์ค˜์•ผ ํ•ฉ๋‹ˆ๋‹ค!!
        - id: plant-chat-service
          uri: lb:ws://PLANT-CHAT-SERVICE
          predicates:
            - Path=/plant-chat-service/chat/**
          filters:
            - RemoveRequestHeader=Cookie
            - RewritePath=/plant-chat-service/(?<segment>.*), /$\{segment}

6. ์„œ๋น„์Šค ๋กœ์ง


๋จผ์ € ์ €๋Š” plant-service(์ค‘๊ณ  ๊ฑฐ๋ž˜ ๊ฒŒ์‹œ๊ธ€ ๋งˆ์ดํฌ๋กœ ์„œ๋น„์Šค)์— ์ค‘๊ณ  ๊ฑฐ๋ž˜ ๊ฒŒ์‹œ๊ธ€ ๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ๊ธฐ ๋–„๋ฌธ์— FeignCleint๋ฅผ ํ†ตํ•ด ํ•„์š”ํ•œ ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์™”์Šต๋‹ˆ๋‹ค.

๊ทธ๋ž˜์„œ ํ˜ธ์ถœํ•  ์—”๋“œํฌ์ธํŠธ์™€ ๋ฉ”์„œ๋“œ์— ๋งž๊ฒŒ ์•„๋ž˜์™€ ๊ฐ™์€ ํ˜•์‹์œผ๋กœ ์ž‘์„ฑ์‹œ๋ฉด ๋ฉ๋‹ˆ๋‹ค!!

FeignClient ์ฝ”๋“œ
@FeignClient(name="plant-service")
public interface PlantServiceClient {
    @GetMapping("{์—”๋“œํฌ์ธํŠธ}")
    ResponseEntity<ResponseTradeBoardDto> boardContent(@PathVariable(value = "id") Long id);

    @GetMapping("{์—”๋“œํฌ์ธํŠธ}")
    ResponseEntity<MemberDto> findById(@RequestParam Long id);

    @GetMapping("{์—”๋“œํฌ์ธํŠธ}")
    ResponseEntity<MemberDto> findByUsername(@RequestParam String username);


    @GetMapping("{์—”๋“œํฌ์ธํŠธ}")
    ResponseEntity<MemberDto> getJoinMember(@RequestHeader("Authorization") String jwtToken);
}

์ฑ„ํŒ… ์„œ๋น„์Šค ๋กœ์ง

@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {

    private final ChatRepository chatRepository;
    private final MongoChatRepository mongoChatRepository;
    private final MessageSender sender;
    private final AggregationSender aggregationSender;
    private final MongoTemplate mongoTemplate;
    private final ChatRoomService chatRoomService;
    private final PlantServiceClient plantServiceClient;
    private final CircuitBreakerFactory circuitBreakerFactory;
    private final TokenHandler tokenHandler;

    private final NotificationService notificationService;

    /**
     * ์ฑ„ํŒ…๋ฐฉ ์ƒ์„ฑ ๋ฉ”์„œ๋“œ
     * ๊ฑฐ๋ž˜ ๊ฒŒ์‹œ๊ธ€์„ ์˜ฌ๋ฆฌ์ง€ ์•Š์€ ์‚ฌ๋žŒ๋งŒ ํ˜ธ์ถœํ•˜๋Š” ๋ฉ”์„œ๋“œ
     * ๊ตฌ๋งค ํฌ๋ง์ž๋งŒ ์ฑ„ํŒ…๋ฐฉ์„ ์ƒ์„ฑ ๊ฐ€๋Šฅ
     * FeignCLient๋ฅผ ํ†ตํ•ด plant-service์—์„œ ๊ฑฐ๋ž˜๊ฐ€๋Šฅ ์—ฌ๋ถ€ ํ™•์ธ
     * @param : MemberDto memberDto, ChatRequestDto requestDto
     */
    @Transactional
    public Chat makeChatRoom(Integer memberNo, ChatRequestDto requestDto) {
        CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
        // ์ฑ„ํŒ…์„ ๊ฑธ๋ ค๊ณ  ํ•˜๋Š” ๊ฑฐ๋ž˜๊ธ€์ด ๊ฑฐ๋ž˜ ๊ฐ€๋Šฅ ์ƒํƒœ์ธ์ง€ ์กฐํšŒํ•ด๋ณธ๋‹ค.
        ResponseEntity<ResponseTradeBoardDto> tradeBoardDto = circuitBreaker.run(() ->
                        plantServiceClient.boardContent(requestDto.getTradeBoardNo().longValue()),
                throwable -> ResponseEntity.ok(null));

        // ์กฐํšŒํ•ด์˜จ ๊ฑฐ๋ž˜๊ธ€ ์ƒํƒœ๊ฐ€ ๊ฑฐ๋ž˜์™„๋ฃŒ ์ด๋ผ๋ฉด ๊ฑฐ๋ž˜๊ฐ€ ๋ถˆ๊ฐ€๋Šฅํ•œ ์ƒํƒœ์ด๋‹ค.
        if (tradeBoardDto.getBody().getStatus().equals("๊ฑฐ๋ž˜์™„๋ฃŒ")) {
            throw new IllegalStateException("ํ˜„์žฌ ๊ฑฐ๋ž˜๊ฐ€๋Šฅ ์ƒํƒœ๊ฐ€ ์•„๋‹™๋‹ˆ๋‹ค.");
        }
        Integer tradeBoardNo = tradeBoardDto.getBody().getId().intValue();

        //์ด๋ฏธ ํ•ด๋‹น๊ธ€ ๊ธฐ์ค€์œผ๋กœ ์ฑ„ํŒ…์„ ์š”์ฒญํ•œ ์‚ฌ๋žŒ๊ณผ ๋ฐ›๋Š” ์‚ฌ๋žŒ์ด ์ผ์น˜ํ•  ๊ฒฝ์šฐ ์ฒดํฌ
        if (chatRepository.existChatRoomByBuyer(tradeBoardNo, requestDto.getCreateMember(), memberNo)) {
            Chat existedChat = chatRepository.findByTradeBoardNoAndChatNo(tradeBoardNo, requestDto.getCreateMember());
            return existedChat;

        }
        if (!chatRepository.existChatRoomByBuyer(tradeBoardNo, requestDto.getCreateMember(), memberNo)) {
            Chat chat = Chat.builder()
                    .tradeBoardNo(requestDto.getTradeBoardNo())
                    .createMember(requestDto.getCreateMember())
                    .joinMember(memberNo)
                    .regDate(LocalDateTime.now())
                    .build();

            Chat savedChat = chatRepository.save(chat);

            return savedChat;
        }
        throw new IllegalArgumentException("์กด์žฌํ•˜์ง€ ์•Š๋Š” ๊ฒŒ์‹œ๊ธ€ ์ž…๋‹ˆ๋‹ค");
    }
    /**
     * ์ฑ„ํŒ…๋ฐฉ ๋ฆฌ์ŠคํŠธ ์กฐํšŒ ๋ฉ”์„œ๋“œ
     * FeignCLient๋ฅผ ํ†ตํ•ด plant-service์—์„œ ์œ ์ € ์ •๋ณด ์กฐํšŒํ›„ ์ฑ„ํŒ…๋ฐฉ ๋งŒ๋“  ์‚ฌ๋žŒ์ธ์ง€ ํ™•์ธ
     * mongodb์—์„œ ์ฑ„ํŒ… ๋ฉ”์„ธ์ง€ ๋ณด๋‚ธ ์‹œ๊ฐ„์„ ๋‚ด๋ฆผ์ฐจ์ˆœ์œผ๋กœ ์ •๋ ฌํ›„ ์ฒซ๋ฒˆ์งธ ๊ฐ’ ๋งˆ์ง€๋ง‰ ๋ฉ”์„ธ์ง€๋กœ ์„ธํŒ…
     * @param : Integer memberNo, Integer tradeBoardNo
     */
    public List<ChatRoomResponseDto> getChatList(Integer memberNo, Integer tradeBoardNo) {
        List<ChatRoomResponseDto> chatRoomList = chatRepository.getChattingList(memberNo, tradeBoardNo);
        //Participant ์ฑ„์›Œ์•ผ๋จ(username)
            chatRoomList
                    .forEach(chatRoomDto -> {
                        //param์œผ๋กœ ๋„˜์–ด์˜จ ๋ฉค๋ฒ„๊ฐ€ ์ฑ„ํŒ… ๋งŒ๋“  ๋ฉค๋ฒ„์ผ ๊ฒฝ์šฐ => Participant์— ์ฐธ๊ฐ€ํ•œ ๋ฉค๋ฒ„
//                        ResponseEntity<MemberDto> byId = plantServiceClient.findById(chatRoomDto.getCreateMember().longValue());
                        if (memberNo.equals(chatRoomDto.getCreateMember())) {
                            ResponseEntity<MemberDto> memberDtoResponse = plantServiceClient.findById(chatRoomDto.getJoinMember().longValue());

                            chatRoomDto.setParticipant(new ChatRoomResponseDto.Participant(memberDtoResponse.getBody().getUsername(), memberDtoResponse.getBody().getNickname()));
                        }
                        //param์œผ๋กœ ๋„˜์–ด์˜จ ๋ฉค๋ฒ„๊ฐ€ ์ฑ„ํŒ…๋ฐฉ์— ์ฐธ๊ฐ€ํ•œ ๋ฉค๋ฒ„์ผ ๊ฒฝ์šฐ => Participant์— ์ฑ„ํŒ…๋ฐฉ ๋งŒ๋“  ๋ฉค๋ฒ„
                        if (!memberNo.equals(chatRoomDto.getCreateMember())){
                            ResponseEntity<MemberDto> memberDtoResponse = plantServiceClient.findById(chatRoomDto.getCreateMember().longValue());
                            chatRoomDto.setParticipant(new ChatRoomResponseDto.Participant(memberDtoResponse.getBody().getUsername(), memberDtoResponse.getBody().getNickname()));
                        }
//                      // ์ฑ„ํŒ…๋ฐฉ๋ณ„๋กœ ์ฝ์ง€ ์•Š์€ ๋ฉ”์‹œ์ง€ ๊ฐœ์ˆ˜๋ฅผ ์…‹ํŒ…
                        long unReadCount = countUnReadMessage(chatRoomDto.getChatNo(), memberNo);
                        chatRoomDto.setUnReadCount(unReadCount);

                        // ์ฑ„ํŒ…๋ฐฉ๋ณ„๋กœ ๋งˆ์ง€๋ง‰ ์ฑ„ํŒ…๋‚ด์šฉ๊ณผ ์‹œ๊ฐ„์„ ์…‹ํŒ…
                        Page<Chatting> chatting =
                                mongoChatRepository.findByChatRoomNoOrderBySendDateDesc(chatRoomDto.getChatNo(), PageRequest.of(0, 1));
                        if (chatting.hasContent()) {
                            Chatting chat = chatting.getContent().get(0);
                            ChatRoomResponseDto.LatestMessage latestMessage = ChatRoomResponseDto.LatestMessage.builder()
                                    .context(chat.getContent())
                                    .sendAt(chat.getSendDate())
                                    .build();
                            chatRoomDto.setLatestMessage(latestMessage);
                        }
                    });

        return chatRoomList;
    }
    /**
     * ์ฑ„ํŒ… ๋ฉ”์„ธ์ง€ ์กฐํšŒ ๋ฉ”์„œ๋“œ
     * ์ฑ„ํŒ… ๋ฉ”์„ธ์ง€ ์กฐํšŒ์‹œ ํ•ด๋‹น ๋ฉ”์„ธ์ง€๋ฅผ ์ฝ์€ ๊ฒƒ์ด๋ฏ€๋กœ ๋ฉ”์„ธ์ง€ ์ฝ์Œ ์ฒ˜๋ฆฌ๋„ ์ง„ํ–‰
     * @param : Integer chatRoomNo, Integer memberNo
     */
    public ChattingHistoryResponseDto getChattingList(Integer chatRoomNo, Integer memberNo) {
        CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
        // member id๋กœ ์กฐํ™”
        ResponseEntity<MemberDto> memberDto = circuitBreaker.run(() -> plantServiceClient.findById(memberNo.longValue()),
                throwable -> ResponseEntity.ok(null));

        updateCountAllZero(chatRoomNo, memberDto.getBody().getUsername());
        List<ChatResponseDto> chattingList = mongoChatRepository.findByChatRoomNo(chatRoomNo)
                .stream()
                .map(chat -> new ChatResponseDto(chat, memberNo)
                )
                .collect(Collectors.toList());

        return ChattingHistoryResponseDto.builder()
                .chatList(chattingList)
                .email(memberDto.getBody().getEmail())
                .build();
    }
    /**
     * ๋ฉ”์„ธ์ง€ ์ „์†ก ๋ฉ”์„œ๋“œ
     * jwt ํ† ํฐ์—์„œ username ์ถ”์ถœ
     * ์นดํ”„์นด ํ† ํ”ฝ์œผ๋กœ ๋ฉ”์„ธ ์ „์†ก
     * @param : Message message, String accessToken
     */
    public void sendMessage(Message message, String accessToken) {
        // member id๋กœ ์กฐํ™”
        ResponseEntity<MemberDto> memberDto = plantServiceClient.findByUsername(tokenHandler.getUid(accessToken));

        // ์ฑ„ํŒ…๋ฐฉ์— ๋ชจ๋“  ์œ ์ €๊ฐ€ ์ฐธ์—ฌ์ค‘์ธ์ง€ ํ™•์ธํ•œ๋‹ค.
        boolean isConnectedAll = chatRoomService.isAllConnected(message.getChatNo());
        // 1:1 ์ฑ„ํŒ…์ด๋ฏ€๋กœ 2๋ช… ์ ‘์†์‹œ readCount 0, ํ•œ๋ช… ์ ‘์†์‹œ 1
        Integer readCount = isConnectedAll ? 0 : 1;
        // message ๊ฐ์ฒด์— ๋ณด๋‚ธ์‹œ๊ฐ„, ๋ณด๋‚ธ์‚ฌ๋žŒ memberNo, ๋‹‰๋„ค์ž„์„ ์…‹ํŒ…ํ•ด์ค€๋‹ค.
        message.setSendTimeAndSender(LocalDateTime.now(), memberDto.getBody().getId().intValue(), memberDto.getBody().getNickname(), readCount);
	// ๋ณด๋‚ธ ์‚ฌ๋žŒ์ผ ๊ฒฝ์šฐ์—๋งŒ ๋ฉ”์‹œ์ง€๋ฅผ ์ €์žฅ -> ์ค‘๋ณต ์ €์žฅ ๋ฐฉ์ง€
if (message.getSenderEmail().equals(memberDto.getBody().getEmail())) {
    // Message ๊ฐ์ฒด๋ฅผ ์ฑ„ํŒ… ์—”ํ‹ฐํ‹ฐ๋กœ ๋ณ€ํ™˜
    Chatting chatting = message.convertEntity();
    // ์ฑ„ํŒ… ๋‚ด์šฉ์„ ์ €์žฅ
    Chatting savedChat = mongoChatRepository.save(chatting);
    // ์ €์žฅ๋œ ๊ณ ์œ  ID๋ฅผ ๋ฐ˜ํ™˜
    message.setId(savedChat.getId());
}

        // ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•œ๋‹ค.
        sender.send(KafkaUtil.KAFKA_TOPIC, message);
    }
    /
    /**
     * ์ฐธ๊ฐ€์ž ์ž…์žฅ ์•Œ๋ฆผ ๋ฉ”์„œ๋“œ
     * @param : String email, Integer chatRoomNo
     */
    public void updateMessage(String email, Integer chatRoomNo) {
        Message message = Message.builder()
                .contentType("notice")
                .chatNo(chatRoomNo)
                .content(email + " ๋‹˜์ด ๋Œ์•„์˜ค์…จ์Šต๋‹ˆ๋‹ค.")
                .build();

        sender.send(KafkaUtil.KAFKA_TOPIC, message);
    }


    /**
     * ํŒ๋งค์ž๊ฐ€ ์ฐธ๊ฐ€ํ•œ  ์ฑ„ํŒ…๋ฐฉ์ด ์กด์žฌํ•˜๋Š”์ง€ ์œ ๋ฌด ์ฒ˜๋ฆฌ ๋ฉ”์„œ๋“œ
     * ๋‹จ์ˆœ ์กฐํšŒ์šฉ ๋ฉ”์„œ๋“œ๋ผ readOnly = true
     *
     * @param : Integer tradeBoardNo,  Integer memberNo
     */
    @Transactional(readOnly = true)
    public Boolean existChatRoomBySeller(Integer tradeBoardNo, Integer memberNo) {
        return chatRepository.existChatRoomBySeller(tradeBoardNo, memberNo);
    }

 }

์ฝ”๋“œ์˜ ์ค‘์š”ํ•œ ๋‚ด์šฉ์€ ์ฃผ์„ ๋‹ฌ์•„๋†จ์Šต๋‹ˆ๋‹ค!

์œ„์—์„œ ๋งŒ๋“  ๋งˆ์ดํฌ๋กœ ์„œ๋น„์Šค๊ฐ„์— ํ†ต์‹ ์„ ํ•˜๊ธฐ ์œ„ํ•ด PlantServiceClient(FeignClient)์—์„œ ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ–ˆ๊ณ , ์™ธ๋ถ€ api๊ฐ„์— ์žฅ์•  ๊ด€๋ฆฌ๋ฅผ ํ•˜๊ธฐ ์œ„ํ•ด CircuitBreaker๋ฅผ ์‚ฌ์šฉํ–ˆ์Šต๋‹ˆ๋‹ค.

  • ๋ฉ”์„ธ์ง€๋ฅผ ๋ณด๋‚ธ ์œ ์ €์™€ ํ˜„์žฌ ์ ‘์†ํ•œ ์œ ์ €๊ฐ€ ๋™์ผํ•˜๋ฉด DB์— ์ฑ„ํŒ… ๋‚ด์—ญ์„ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
  • sender.send : Kafka ํ”„๋กœ๋“€์„œ๋Š” KafkaUtil.KAFKA_TOPIC์— ์ง€์ •๋œ ํ† ํ”ฝ์œผ๋กœ message ๊ฐ์ฒด๋ฅผ ์ „์†กํ•ฉ๋‹ˆ๋‹ค.

7. ์ปจํŠธ๋กค๋Ÿฌ


@Slf4j
@RestController
@RequiredArgsConstructor
public class ChatController {
    private final ChatRoomService chatRoomService;
    private final ChatService chatService;

    @PostMapping("/chatroom")
    public ResponseEntity<StatusResponseDto> createChatRoom(@RequestBody @Valid final ChatRequestDto requestDto, @RequestParam(required = false) Integer memberNo,
                                                            BindingResult bindingResult) {

        if (bindingResult.hasErrors()) {
            return ResponseEntity.badRequest().body(StatusResponseDto.addStatus(400));
        }

        // ์ฑ„ํŒ…๋ฐฉ์„ ๋งŒ๋“ค์–ด์ค€๋‹ค.
        Chat chat = chatService.makeChatRoom(memberNo, requestDto);
        return ResponseEntity.ok(StatusResponseDto.addStatus(200, chat));
    }

    // ์ฑ„ํŒ…๋‚ด์—ญ ์กฐํšŒ
    @GetMapping("/chatroom/{roomNo}")
    public ResponseEntity<ChattingHistoryResponseDto> chattingList(@PathVariable("roomNo") Integer roomNo, @RequestParam(required = false) Integer memberNo) {
        ChattingHistoryResponseDto chattingList = chatService.getChattingList(roomNo, memberNo);
        return ResponseEntity.ok().body(chattingList);
    }

    // ์ฑ„ํŒ…๋ฐฉ ๋ฆฌ์ŠคํŠธ ์กฐํšŒ
    @GetMapping("/chatroom")
    public ResponseEntity<List<ChatRoomResponseDto>> chatRoomList(@RequestParam(value = "tradeBoardNo", required = false) Integer tradeBoardNo,
                                                                  @RequestParam(value = "memberNo", required = false) Integer memberNo) {
        List<ChatRoomResponseDto> chatList = chatService.getChatList(memberNo, tradeBoardNo);
        return ResponseEntity.ok(chatList);
    }

    @MessageMapping("/message")
    public void sendMessage(@Valid Message message, @Header("Authorization") final String accessToken) {
        chatService.sendMessage(message, accessToken);
    }

    @MessageExceptionHandler
    @SendTo("/error")
    public String handleException(Exception e) {

        return "WebSocket ๋ฉ”์‹œ์ง€ ํ•ธ๋“ค๋Ÿฌ์—์„œ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค: " + e;
    }

    // ์ฑ„ํŒ…๋ฐฉ ์ ‘์† ๋Š๊ธฐ
    @PostMapping("/chatroom/{chatroomNo}")
    public ResponseEntity<HttpStatus> disconnectChat(@PathVariable("chatroomNo") Integer chatroomNo,
                                                     @RequestParam(value = "nickname", required = false) String nickname) {
        chatRoomService.disconnectChatRoom(chatroomNo, nickname);
        return ResponseEntity.ok(HttpStatus.ACCEPTED);
    }

    // ํŒ๋งค์ž๊ฐ€ ์ฐธ๊ฐ€ํ•œ ์ฑ„ํŒ…๋ฐฉ ์กด์žฌํ•˜๋Š”์ง€ ์กฐํšŒ
    @GetMapping("/chatroom/exist/seller")
    public Boolean existChatRoomBySeller(@RequestParam Integer tradeBoardNo, @RequestParam Integer memberNo) {
        return chatService.existChatRoomBySeller(tradeBoardNo, memberNo);
    }
}

8. ๋งˆ๋ฌด๋ฆฌ


์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ ์ฑ„ํŒ… ๊ธฐ๋Šฅ ๊ตฌํ˜„ ๊ณผ์ •์„ ํฌ์ŠคํŒ… ํ•ด๋ดค์Šต๋‹ˆ๋‹ค. ๋‹ค์Œ ํฌ์ŠคํŒ…์—์„œ๋Š” SSE๋ฅผ ํ™œ์šฉํ•œ ์•Œ๋ฆผ ๊ธฐ๋Šฅ์— ๊ด€ํ•ด ํฌ์ŠคํŒ… ํ•˜๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค!!

์ฐธ๊ณ 


https://velog.io/@joonghyun/Springboot-MongoDB-Springboot%EC%99%80-MongoDB-%EC%97%B0%EA%B2%B0%ED%95%98%EA%B8%B0
https://velog.io/@ch4570/Stomp-Kafka%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-%EC%B1%84%ED%8C%85-%EA%B8%B0%EB%8A%A5-%EA%B0%9C%EB%B0%9C%ED%95%98%EA%B8%B0-with-Spring-Boot-2-Kafka-%EC%84%A4%EC%B9%98-MongoDB-Stomp-%EC%84%A4%EC%A0%95

profile
๋ฐฑ์—”๋“œ ๊ณต๋ถ€์ค‘์ž…๋‹ˆ๋‹ค!

2๊ฐœ์˜ ๋Œ“๊ธ€

comment-user-thumbnail
2024๋…„ 10์›” 11์ผ

๋ง‰๋ง‰ํ–ˆ๋Š”๋ฐ ๋„ˆ๋ฌด ์ž˜ ๊ณต๋ถ€ํ•˜๊ตฌ ๊ฐ‘๋‹ˆ๋‹ค ใ… ใ… 

1๊ฐœ์˜ ๋‹ต๊ธ€

๊ด€๋ จ ์ฑ„์šฉ ์ •๋ณด