Cassandra-Spring boot-STOMP 실시간 통신

Sieun Sim·2020년 5월 31일
0

서버개발캠프

목록 보기
18/21

그냥 socket이 세션을 하나씩 관리해야 한다면, stomp를 사용하면 pub/sub 형태의 발행자/구독자들 형태로 관리할 수 있다. 우리 프로젝트의 경우 bts column에 보여줄 트윗은 모든 사용자에게 동일하기 때문에 하나의 발행자만 있으면 된다. 채팅에서 많이 쓰는 형태인데 우리 프로젝트에서도 하나의 발행자→많은 구독자라는 면에서 적절한 것 같아 사용하려 한다.

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

Stomp config

@RequiredArgsConstructor
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/pub");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
    }
}

Java의 Message

java 객체를 다루면서 프레임워크 차원에서 쓰는 메타데이터로 래핑해주는 것이다. payload와 header로 나눠지는데 payload는 아무 자바 객체고 헤더는 name:value 형태의 맵이나 스트링이다. 아래와 같이 MessageBuilder로 메시지를 만들 수 있다.

Message<String> message = MessageBuilder.withPayload("Message Payload")
    .setHeader("Message_Header1", "Message_Header1_Value")
    .setHeader("Message_Header2", "Message_Header2_Value")
    .build();

그냥 소켓을 사용하면 @EnableWebSocket , WebSocketConfigurer 를 사용한다. Stomp를 사용하면 MessageBroker가 추가되는데 말그대로 위에 말한 message의 broker 설정을 할 수 있다고 이해했다. 그 대표가 STOMP인 것뿐이고.

SimpMessageSendingOperations

A specialization of MessageSendingOperations with methods for use with the Spring Framework support for Simple Messaging Protocols (like STOMP).

메시지를 도착지까지 보내는 MessageSendingOperations<Destination> 을 스프링 프레임워크에 맞춘것. 문서에 대놓고 STOMP같은걸 지원하기 위함이라고 써져있다.

convertAndSend

Convert the given Object to serialized form, possibly using a MessageConverter, wrap it as a message and send it to the given destination.

컨트롤러에서는 Cassandra에서 정해진 시간마다 쿼리를 해 구독자들이 데이터를 가지고 갈 수 있도록 해당 토픽에 전송할 것이다. @Scheduled 어노테이션을 통해 쉽게 설정할 수 있다.

Stomp Controller

@EnableScheduling
@RequiredArgsConstructor
@Controller
public class CasanController {
    private final SimpMessageSendingOperations messagingTemplate;

        @Scheduled(fixedRate = 5000)
    public void greeting() {
        List<Casan> c= casanRepository.findCasansBy(LocalDate.now(), LocalTime.now().minusSeconds(5));
        System.out.println(c);
        messagingTemplate.convertAndSend("/topic/message", c);
    }
}

Cassandra

최소 3대 이상 구성해야 하며 노드가 추가될수록 성능이 좋아진다.

Column Family: 하나의 Key에 여러개의 컬럼이 달려 있는 형태로, 컬럼들의 집합

데이타간의 복잡한 관계 정의(Foreign Key)등이 필요없고, 대용량과 고성능 트렌젝션을 요구하는 SNS (Social Networking Service)에 많이 사용되고 있으며 성능이나 확장성과 안정성이 뛰어납니다.

→ 처음 카산드라를 글로만 볼 때는 몰랐는데 직접 모델을 짜보니 무슨말인지 알겠다. partition key, clustering key로 단순하게 클러스터를 나누고 또 접근해서 빠른 듯 하다. 복잡한 관계 정의가 필요없다기 보다는 모델링 자체를 복잡한 관계가 없도록 만드는 게 핵심인 듯 하다. 같은 데이터도 관계가 필요하다면 아예 다른 테이블로 만들어 버리는 느낌?

Cassandra vs MongoDB

  • Cassandra 노드가 추가될수록 MonogoDB 보다 훨씬 나은 선형적인 성능 향상을 보인다.
  • 다중 Index가 필요한 구조라면 MongoDB를 선택하고, 데이터 항목 변경이 많고 unique access가 많은 경우라면 Cassandra가 적합

우분투에 Cassandra 설치

https://www.hostinger.com/tutorials/set-up-and-install-cassandra-ubuntu/


Spring boot와 Cassandra 연결

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>

시작하면 내부적으로 DataStax Java driver 3.7.2 for Apache Cassandra 를 사용하는 것을 볼 수 있다. 관련 설정에 대해 Datastax Java driver로 검색하면 이것저것 알 수 있다. 이 의존성을 다른 DB 연결하던것과 마찬가지로 손쉽게 설정할 수 있다.

Repository

@Repository
public interface CasanRepository extends CrudRepository<Casan, UUID> {
}

application.properties

spring.data.cassandra.keyspace-name=my_keyspace
spring.data.cassandra.contact-points=localhost
spring.data.cassandra.port=9042
spring.data.cassandra.schema-actimon=create_if_not_exists //없으면 만들기 정책

Controller

@Autowired
private CasanRepository casanRepository;

@GetMapping(path = "/test")
public @ResponseBody List<Casan> test() {
    List<Casan> products = new ArrayList<>();
    casanRepository.findAll().forEach(products::add); //fun with Java 8
    return products;
}

Cassandra CDC

처음는 카산드라에서 변화가 생기면 즉각적으로 커넥션 풀 차원에서 연결되어 있는 data sending 서버가 변화를 알 수 있게 하고 싶었다. DB 내에서의 trigger과 달리 서버에 바로 신호를 주는 것은 어려운 일인 것 같다. 그나마 가까운 솔루션은 Change Data Capture(CDC)로 DB 시스템에서 커밋 로그를 파일에 쓰는 것 자체에 접근해서 그 변화를 감지한다는 것이다.. 자료가 정말 없고 몇개 없는 자료에서도 자료가 없다고 언급한다. 하드코딩으로 서버가 로그파일을 감시해 변화를 알아채는 프로젝트를 발견했는데 우리가 쓰기엔 개발 시간도 부족하고 어느 정도의 속도가 나올지 감이 안왔다. 그리고 CDC를 활성화해 log파일을 봤는데 제 때 제대로 반영되고 있는건지 잘 모르겠다.. 그래서 접근 방식을 달리해 카산드라에 저장 자체를 시간으로 쿼리하기 최적화된 형태로 하고 perodical polling을 해보기로 했다.

CDC 활성화하기

/etc/cassandracassandra.yaml 파일에서 cdc_enabled=true로 바꿔준다. 이 설정을 적용시키려면 카산드라를 껐다 켜야한다.

$ sudo service cassandra stop
$ sudo service cassandra start

Cassandra data 저장 구조와 적절한 partitioning에 대해

쿼리를 날렸더니 카산드라에서 ALLOW FILTER를 요구해본다면 데이터 모델의 구조에 대해 다시 생각해볼 필요가 있다. 차라리 같은 데이터에 대해 모델을 여러개 만들지언정, 카산드라의 저장 가치관(?)에 반해 partition - cluster의 순서를 잘 안지키면 서로 다른 파티션이나 클러스터에 대해 강제적인 검색을 해야하므로 지양되는듯 하다.

스택오버플로우에서 찾은 아래 글이 이해하는데에 아주 도움이 됐다.


To understand this, you need to understand how cassandra stores data. The first key in the primary key is called the partition key. It defines the partition the row belongs to. All rows in a partition are stored together, and replicated together. Inside a partition, rows are stored according to the clustering keys. These are the columns in the PK that's not the partition key. So, if your PK is (a, b, c, d), a defines the partition. And in a particular partition (say, a = a1), the rows are stored sorted by b. And for each b, the rows are stored sorted by c...and so on. When querying, you hit one (or a few partitions), and then need to specify every successive clustering key up until the key you're looking for. These have to exact equalities except for the last clustering column specified in your query, which may be a range query.

In the previous example, you could thus do

where a = a1 and b > b1 where a = a1 and b=b1 and c>c1 where a = a1 and b=b1 and c=c1 and d > d1

but can't do this:

where a=a1 and c=c1

To do that, you'd need "allow filtering" (realistically, you should look at changing your model, or denormalizing at that point).

Now, on to your question about making every column part of the PK. You could do that, but remember, all writes in Cassandra are upserts. Rows are identified by their primary key. If you make every column part of the PK, you'll not be able to edit a row. You're not allowed to update the value of any column that's in the primary key.

Cassandra with time series

CREATE TABLE test_keyspace.test_table_ex1 ( 
    code text, 
    location text, 
    sequence text, 
    description text, 
    PRIMARY KEY (code, location)
);

이런식으로 있으면 PRIMARY KEY에서 처음에 오는 code가 partition key, 두번째인 location이 cluster key가 된다.

여러 시도를 해본 결과 날짜로 찾고 싶을 때는 partition key를 localdate(YYYY-MM-DD)로, cluster key를 localtime으로 하면 적절한 것 같다. @PrimaryKeyColumn 어노테이션을 이용해 partition key와 cluster key를 손쉽게 지정해줄 수 있다.

Domain

@Data
@Table("examples")
public class Casan implements Serializable {

    @CassandraType(type = DataType.Name.UUID)
    private UUID id= UUIDs.timeBased();

    private String desc;

    @PrimaryKeyColumn(ordinal=1, type = PrimaryKeyType.CLUSTERED, ordering = Ordering.DESCENDING)
    private LocalTime create_at= LocalTime.now();

    @PrimaryKeyColumn(ordinal = 0, type = PrimaryKeyType.PARTITIONED)
    private LocalDate date = LocalDate.now();
}

위의 객체대로 examples table을 만들어 cqlsh에서 desc examples를 해보면

CREATE TABLE my_keyspace.examples (
    date date,
    create_at time,
    "desc" text,
    id uuid,
    PRIMARY KEY (date, create_at)
) WITH CLUSTERING ORDER BY (create_at DESC)

이렇게 만들어졌다는 것을 알 수 있다.

Client(React)

import React from 'react';
import { Client } from '@stomp/stompjs';

componentDidMount() {
    this.client = new Client();
    this.client.configure({
    brokerURL: 'ws://localhost:8089/ws/websocket',
    onConnect: () => {
      console.log('onConnect');
      this.client.subscribe('/topic/message', message => {
        console.log(message.body);
      });
    },
    // Helps during debugging, remove in production
    debug: (str) => {
      console.log(new Date(), str);
    }
  });

  this.client.activate();
}

정말 요상하게도 Client에서 연결하는 endpoint url 뒤에 /websocket 을 붙여주어야만 정상적으로 동작한다. spring boot의 config에서 endpoint를 분명히 /ws로 설정했는데 하나의 약속인지 뭔지 모르겠다 구글링해도 저걸 붙여야된다는걸 겨우 찾았을 뿐 이유는 못찾았다.

참고자료

MessageSendingOperations (Spring Framework 5.2.3.RELEASE API)

Apache Cassandra 톺아보기 - 1편 : TOAST Meetup

Change Data Capture (CDC)

Cassandra to Kafka Data Pipeline (Part 2) - DZone Big Data

https://docs.datastax.com/en/tutorials/Time_Series.pdf

https://stackoverflow.com/questions/29314578/cassandra-query-making-cannot-execute-this-query-as-it-might-involve-data-filt

0개의 댓글