Stomp + Kafka를 이용한 채팅 기능 개발하기 - (with Spring Boot) #2 (Kafka 설치 + MongoDB & Stomp 설정)

DevSeoRex·2023년 6월 13일
23
post-thumbnail

🥳 Kafka & Zookeeper 설치와 셋팅!

Kafka와 Zookeeper는 docker-compose를 이용해서 이미지를 pull받고 띄워주도록 하겠습니다.

# compose 파일 버전
version: '3'
services:
  # 서비스 명
  zookeeper:
    # 사용할 이미지
    image: wurstmeister/zookeeper
    # 컨테이너명 설정
    container_name: zookeeper
    # 접근 포트 설정 (컨테이너 외부:컨테이너 내부)    
    ports:
      - "2181:2181"
  # 서비스 명
  kafka:
    # 사용할 이미지
    image: wurstmeister/kafka
    # 컨테이너명 설정
    container_name: kafka
    # 접근 포트 설정 (컨테이너 외부:컨테이너 내부)
    ports:
      - "9092:9092"
    # 환경 변수 설정
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost     
      KAFKA_CREATE_TOPICS: "Topic:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    # 볼륨 설정
    volumes:
      - /var/run/docker.sock
    # 의존 관계 설정
    depends_on:
      - zookeeper

docker-compose.yml 파일에 위에 작성된 코드를 붙여 넣어 주시고 파일이 있는 경로에서 아래의 명령어를 입력해줍니다.

$ docker-compose up - d

성공적으로 실행되고 있는 것을 볼 수 있습니다.

Kafka와 Zookeeper 사용을 위해 기본적인 설정을 해주겠습니다.

Zookeeper 설정하기

  • Zookeeper Bash로 이동합니다.

    $ docker exec -i -t zookeeper bash

  • Zookeeper 서버를 실행합니다.

    $ bash bin/zkServer.sh start /opt/zookeeper-3.4.13/conf/zoo.cfg -demon

Zookeeper 서버의 2181 포트가 Listen 상태로 변경되어 있는 걸 확인할 수 있습니다.

Kafka 설정하기

  • Kafka Bash로 이동합니다.

    $ docker exec -i -t kafka bash

  • Kafka 서버를 실행합니다.

    $ kafka-server-start.sh -daemon

Kafka 서버 설정까지 완료 되었습니다.

Kafka는 Topic을 생성해서 해당 주제(Topic)를 구독해야 pub/sub을 통한 메시지의 생산과 소비가 가능합니다.
따라서 채팅 기능을 개발할때 사용할 Topic을 생성하도록 하겠습니다.

  • Kafka Bash로 이동한 상태에서 아래의 명령어로 Topic을 생성합니다.

    $ kafka-topics.sh --create --zookeeper zookeeper:2181 --topic { Topic 이름 } --partitions 1 --replication-factor 1

  • Topic이 잘 생성됬는지 확인합니다.

    $ kafka-topics.sh --list --zookeeper zookeeper


Kafka에서 사용할 Topic을 생성하고, Zookeeper와 Kafka의 셋팅은 모두 마쳤습니다.

😎 MongoDB 설치하기!

MongoDB 설치는 간단하기 때문에 docker-compose를 사용하지 않고 바로 docker 이미지를 pull 받아서 실행했습니다.

  • MongoDB 이미지 pull 받기

    $ docker pull mongo

  • MongoDB 컨테이너 생성 및 실행하기

    $ docker run --name mongodb-container -v ~/data:/data/db -d -p 27017:27017 mongo

설치해야 하는 부분은 모두 완료가 되었습니다. 이제 Spring에서의 의존성 추가와 설정을 시작해보겠습니다.

🧑‍💻 Kafka & Stomp & MongoDB Spring 셋팅하기!

  • build.gradle 파일에 의존성을 추가해주겠습니다.

MongoDB 설정하기

  • application-mongo.yml 파일을 작성해줍니다.

  • 설정 클래스에 사용할 properties 파일을 작성해줍니다.

@Data
@Component
@ConfigurationProperties(prefix = "mongodb")
public class MongoProperties {
    String client;
    String name;
}
  • MongoConfig 클래스를 작성합니다.
@Configuration
@RequiredArgsConstructor
@EnableMongoRepositories(basePackages = "com.adoptpet.server.adopt.mongo")
public class MongoConfig {

    private final MongoProperties mongoProperties;

    @Bean
    public MongoClient mongoClient() {
        return MongoClients.create(mongoProperties.getClient());
    }

    @Bean
    public MongoTemplate mongoTemplate() {
        return new MongoTemplate(mongoClient(), mongoProperties.getName());
    }
}

@EnableMongoRepositories의 경로나 다른 필요한 부분들은 직접 커스텀해서 쓰시면 될 거 같습니다.

MongoConfig 클래스를 작성하지 않고 MongoRepository를 상속받아서 사용하는 것도 가능하지만, 동적 쿼리 문제를 해결하려면 MongoTemplate의 도움을 받아야 하기 때문에 저는 설정 클래스를 따로 만들어 주었습니다.

  • MongoDB에서 메시지 저장에 사용할 도메인 모델을 만들어주겠습니다.
@Document(collection = "chatting")
@Getter @ToString
@Setter
@AllArgsConstructor @Builder
@NoArgsConstructor
// MongoDB Chatting 모델
public class Chatting {

    @Id
    private String id;
    private Integer chatRoomNo;
    private Integer senderNo;
    private String senderName;
    private String contentType;
    private String content;
    private LocalDateTime sendDate;
    private long readCount;

}

Kafka와 Stomp Client 설정하기

  • Kafka에서 메시지 전달에 사용할 도메인 모델을 작성합니다.
@Getter
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Message implements Serializable {

    private String id;

    @NotNull
    private Integer chatNo;

    @NotNull
    private String contentType;

    @NotNull
    private String content;

    private String senderName;

    private Integer senderNo;

    @NotNull
    private Integer saleNo;

    private long sendTime;
    private Integer readCount;
    private String senderEmail;

    public void setSendTimeAndSender(LocalDateTime sendTime, Integer senderNo, String senderName, Integer readCount) {
        this.senderName = senderName;
        this.sendTime = sendTime.atZone(ZoneId.of("Asia/Seoul")).toInstant().toEpochMilli();
        this.senderNo = senderNo;
        this.readCount = readCount;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Chatting convertEntity() {
        return Chatting.builder()
                .senderName(senderName)
                .senderNo(senderNo)
                .chatRoomNo(chatNo)
                .contentType(contentType)
                .content(content)
                .sendDate(Instant.ofEpochMilli(sendTime).atZone(ZoneId.of("Asia/Seoul")).toLocalDateTime())
                .readCount(readCount)
                .build();
    }
  • Kafka consumer 설정 클래스를 작성합니다.
@EnableKafka
@Configuration
public class ListenerConfiguration {

    // KafkaListener 컨테이너 팩토리를 생성하는 Bean 메서드
    @Bean
    ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    // Kafka ConsumerFactory를 생성하는 Bean 메서드
    @Bean
    public ConsumerFactory<String, Message> consumerFactory() {
        JsonDeserializer<Message> deserializer = new JsonDeserializer<>();
        // 패키지 신뢰 오류로 인해 모든 패키지를 신뢰하도록 작성
        deserializer.addTrustedPackages("*");

        // Kafka Consumer 구성을 위한 설정값들을 설정 -> 변하지 않는 값이므로 ImmutableMap을 이용하여 설정
        Map<String, Object> consumerConfigurations =
                ImmutableMap.<String, Object>builder()
                        .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
                        .put(ConsumerConfig.GROUP_ID_CONFIG, "adopt")
                        .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                        .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build();

        return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
    }

}
  • kafka producer 설정 클래스를 작성합니다.
@EnableKafka
@Configuration
public class ProducerConfiguration {

    // Kafka ProducerFactory를 생성하는 Bean 메서드
    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigurations());
    }

    // Kafka Producer 구성을 위한 설정값들을 포함한 맵을 반환하는 메서드
    @Bean
    public Map<String, Object> producerConfigurations() {
        return ImmutableMap.<String, Object>builder()
                .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
                .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
                .build();
    }

    // KafkaTemplate을 생성하는 Bean 메서드
    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

이제 마지막으로 WebSocket을 활성화하고 메시지 브로커를 사용하기 위한 설정을 해보겠습니다.

  • WebSocketConfiguration 클래스를 작성합니다.
@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker // WebSocket을 활성화하고 메시지 브로커 사용가능
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    private final StompHandler stompHandler;

    // STOMP 엔드포인트를 등록하는 메서드
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat") // STOMP 엔드포인트 설정
                .setAllowedOriginPatterns("*") // 모든 Origin 허용 -> 배포시에는 보안을 위해 Origin을 정확히 지정
                .withSockJS(); // SockJS 사용가능 설정
    }

    // 메시지 브로커를 구성하는 메서드
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/subscribe"); // /subscribe/{chatNo}로 주제 구독 가능
        registry.setApplicationDestinationPrefixes("/publish"); // /publish/message로 메시지 전송 컨트롤러 라우팅 가능
    }

    // 클라이언트 인바운드 채널을 구성하는 메서드
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        // stompHandler를 인터셉터로 등록하여 STOMP 메시지 핸들링을 수행
        registration.interceptors(stompHandler);
    }

    // STOMP에서 64KB 이상의 데이터 전송을 못하는 문제 해결
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(160 * 64 * 1024);
        registry.setSendTimeLimit(100 * 10000);
        registry.setSendBufferSizeLimit(3 * 512 * 1024);
    }

}

각 설정 클래스에는 모두 주석을 달아 두었습니다.
WebSocketConfiguration 클래스에 중요한 부분이 몇가지 있어서 이 부분만 뜯어 보겠습니다.

  • Stomp 연결을 프론트에서 시도할때 요청을 보낼 엔드포인트를 지정하는 부분입니다.

Stomp를 지원하지 않는 브라우저나 환경에서 사용할경우 SockJS를 사용한 emulataion을 하기 위해 SockJS 사용이 가능하도록 설정해주었습니다.

  • 메시지 브로커를 구성하는 메서드 작성

간단히 예를 들자면, A라는 사람은 1번 채팅방에 있고 B와 C는 2번 채팅방에 있습니다.
이런 상황에서 C가 채팅을 보낸다면 B는 같은 채팅방에 있으므로 메시지를 받겠지만 A가 받는일은 없어야 합니다.

프론트앤드에서 채팅방에 유저가 접속하게 되면, /subscribe/{채팅방 번호}로 접속한 채팅방을 구독하도록 구현하였습니다.

  • A는 1번 채팅방에 접속 -> A가 /subscribe/1을 구독함
  • B와 C는 2번 채팅방에 접속 -> B와 C는 /subscribe/2를 구독
  • C가 메시지를 보냄 -> 현재 같은 채팅방을 구독중인 B에게만 메시지가 전달됨

이런 구조를 가져가기 위해서 채팅방번호로 구독하는 채팅방을 구분하도록 처리하였습니다.

마지막으로 /publish/{Message Mapping의 엔트포인트}로 send 요청을 보내 메시지를 전송할 수 있습니다.
이 부분은 채팅 기능을 구현하면서 따로 설명드리도록 하겠습니다.

  • 클라이언트 인바운드 채널 구성을 위한 메서드

현재 만들고 있는 애플리케이션에서는 JWT 인증 절차가 채팅에서도 적용이 필요합니다.
따라서 JWT 토큰을 검증하고 인증 절차를 수행해줄 interceptor를 만들어주고, 등록해주었습니다.

StompHandler 클래스를 채팅 기능 구현 포스팅에서 작성하도록 하겠습니다.

  • 데이터 전송 크기 문제 해결

AWS S3에서 사진을 업로드하는 것은 요금이 어느정도 지출되는 작업이기 때문에, 팀원과 협의를 통해서 base64 문자열로 인코딩해서 서버로 보내줄 것을 요청했습니다.

문자열로 인코딩한 이미지는 용량이 64KB를 초과해서 서버가 에러를 보내고 있어서 STOMP에서 큰 용량의 메시지를 보낼 수 있도록 설정해주는 부분입니다.

🤤 설정 끝! 개발 시작!

이제 드디어 설정이 모두 끝났습니다.
다음 포스팅부터는 본격적으로 채팅 기능 개발을 위한 비즈니스 로직과 컨트롤러 등을 작성해보겠습니다.

오늘도 읽어주셔서 감사합니다.

🙇

참고한 레퍼런스

10개의 댓글

comment-user-thumbnail
2023년 6월 14일

멋있습니다 :)

1개의 답글
comment-user-thumbnail
2023년 6월 14일

데브렉스 서 짱입니다..
근데 데브서로 바뀌셨네예~?

1개의 답글
comment-user-thumbnail
2024년 7월 11일

안녕하세요? 좋은 글 감사합니다.
혹시 여기서 카프카를 외부 브로커로 사용한다 했을 때, enableSimpleBroker가 되어있어야만 클라이언트와 Websocket 메시지를 주고 받을 수 있는 것인가요?

글 제목이 Stomp + Kafka를 이용한 채팅 기능이다보니, 제가 기대한 부분은 어떻게 스프링-카프카를 사용하여 클라이언트와 채팅을 주고받는 지가 궁금했습니다. 그런데 스프링-카프카가 연동되는 설정(Producer/Consumer)만 있어서 여쭤봅니다..!

1개의 답글
comment-user-thumbnail
2024년 7월 21일

글 너무 잘 읽고 있습니다!! 혹시 다음 글은 없는건가요??

1개의 답글