이번 목표는 "메시지브로커에 메시지가 발행이 될 때마다 가격을 업데이트해서 보여주는 것"이다.

여기서 중간에 서버를 또 거치면 지연시간이 발생할텐데, 메시지브로커에 발행된 메시지를 그냥 클라이언트에 직접 꽂으면 빠르지 않을까? 라는 생각이 들었다.
Redis 보안 관리 공식문서
https://redis.io/docs/latest/operate/oss_and_stack/management/security/
For instance, in the common context of a web application implemented using Redis as a database, cache, or messaging system, the clients inside the front-end (web side) of the application will query Redis to generate pages or to perform operations requested or triggered by the web application user.
In this case, the web application mediates access between Redis and untrusted clients (the user browsers accessing the web application).
신뢰할 수 없는 클라이언트로부터 Redis에 직접 쿼리를 보낼 수 있는 정보가 노출이 되고 (Redis 서버 정보), 이로 인해 보안, 안정성 문제가 발생한다. 라는 내용이다.
그래서 백엔드를 통해 접근하는 방식으로 구현한다.
이렇게 정리하고보니 Database를 프론트에 직접 연결해본적이 없다는 게 생각났다.
우선 클라이언트에 실시간 시세정보가 보여지기까지 과정을 생각해보자.
과정
1. 메시지브로커로 활용하는 Redis로부터 price 채널 내에 발행된 메시지를 가져와야한다.
2. 가져온 메시지를 비트코인, 이더리움 시세로 분리해야한다.
3. 각각 웹소켓을 연결을 통해 클라이언트로 데이터를 전달한다.
이러한 과정에서 구현에 있어 필요한 것은 무엇일까?
구현
1. Redis 서버와 연결
2. Websocket 설정 및 메시지 발행
3. React로 Websocket 연결 구독
package com.crpyto_trading.demo.config;
import com.crpyto_trading.demo.infra.redis.RedisPriceSubscriber;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, String> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
return template;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
RedisPriceSubscriber subscriber
) {
RedisMessageListenerContainer container =
new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(
subscriber,
new PatternTopic("price:*") // 티커별 구독
);
return container;
}
}
Redis Pub/Sub은 메시지가 발행되면 구독자에게 즉시 전달되는 구조로 메시지를 수신하기 위해 Listener를 등록한다.
Spring에서는 RedisMessageListenerContainer를 통해 Redis 서버와 연결하고, 특정 채널 패턴에 대해 MessageListener를 구현한 객체를 등록한다. 여기서 구현 객체는 subscriber이다.
이후 메시지가 발행되면 컨테이너가 해당 Listener의 onMessage() 메서드를 호출하여 메시지를 처리한다.
package com.crpyto_trading.demo.infra.redis;
import com.crpyto_trading.demo.service.PriceStreamService;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class RedisPriceSubscriber implements MessageListener {
private final PriceStreamService priceStreamService;
public RedisPriceSubscriber(PriceStreamService priceStreamService) {
this.priceStreamService = priceStreamService;
}
@Override
public void onMessage(Message message, byte[] pattern) {
String payload = new String(message.getBody(), StandardCharsets.UTF_8);
String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
// 받아온 메시지를 처리 (Service에 처리 로직 구현)
priceStreamService.onPriceMessage(channel, payload);
}
}
Redis PUB/SUB으로부터 받은 메시지를 처리하는 onMessage를 구현하는데,
Redis로부터 온 데이터를 RedisTemplate를 활용해서 다뤄야하는게 아닌가? 싶지만
Redis에 접근해서 저장, 조회를 하는 것이 아니니깐 필요없다.
priceStreamService를 통해 받아온 메시지를 처리한다.
package com.crpyto_trading.demo.service;
import com.crpyto_trading.demo.infra.websocket.outbound.PriceWebSocketPublisher;
import org.springframework.stereotype.Service;
@Service
public class PriceStreamService {
private final PriceWebSocketPublisher publisher;
public PriceStreamService(PriceWebSocketPublisher publisher) {
this.publisher = publisher;
}
public void onPriceMessage(String channel, String payloadJson) {
// channel: price:BTCUSDT
String symbol = extractSymbol(channel);
publisher.publish(symbol, payloadJson);
}
private String extractSymbol(String channel) {
// "price:BTCUSDT" → "BTCUSDT"
int idx = channel.indexOf(':');
return (idx > -1) ? channel.substring(idx + 1) : channel;
}
}
종목별 시세 구독을 처리하기 위해 extractSymbol로 종목 정보를 추출한다.
클라이언트에게 실시간 시세 정보를 전달하기 위해, onPriceMessage 메서드에서 PriceWebSocketPublisher를 통해 메시지를 발행한다.
메시지를 어떻게 발행하는지는 아래...
package com.crpyto_trading.demo.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}
registerStompEndpoints 메서드를 통해 클라이언트와의 실시간 통신을 위한 WebSocket 엔드포인트를 설정하고,
configureMessageBroker 메서드를 통해 Spring의 STOMP 프로토콜을 기반으로 메시지 브로커를 활성화한다.
withSockJS()는 WebSocket 연결이 불가능한 환경에서도 통신을 유지하기 위해 HTTP 기반 대체 전송 방식을 제공한다.
enableSimpleBroker는 서버 -> 클라이언트, setApplicationDestinationPrefixes 클라이언트 -> 서버 방향으로 라우팅할때 필요한 prefix(접두사)를 세팅한다.
package com.crpyto_trading.demo.infra.websocket.outbound;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Component
public class PriceWebSocketPublisher {
private final SimpMessagingTemplate messagingTemplate;
public PriceWebSocketPublisher(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
public void publish(String symbol, String payloadJson) {
// 예: /topic/price/BTCUSDT
messagingTemplate.convertAndSend("/topic/price/" + symbol, payloadJson);
}
}
STOMP 프로토콜 위에 메시지를 발행하기위해 SimpMessagingTemplate을 활용한다.
publish 메서드에서 enableSimpleBroker에서 설정한 접두사가 포함된 목적지로 데이터를 전송하고,
이를 통해 STOMP 프로토콜을 활용하는 메시지 브로커가 해당 주소를 구독 중인 클라이언트들에게 실시간으로 메시지를 발행한다.
public void onPriceMessage(String channel, String payloadJson) {
// channel: price:BTCUSDT
String symbol = extractSymbol(channel);
publisher.publish(symbol, payloadJson);
}
목적지 구분을 통해 종목별 실시간 시세를 구분하려고 symbol, payloadJson 데이터를 publish에 넘겨준 것이다.
클라이언트(브라우저)에서 이걸 어떻게 구독할까? 개념 설명 이후에 있다.
STOMP (Simple/Stream Text Oriented Message Protocol)
1. websocket 위에서 동작하는 문자 기반 메세징 프로토콜
2. 클라이언트와 서버가 전송할 메세지의 유형, 형식, 내용들을 정의
3. pub/sub 구조
역할은 Redis Pub/Sub과 비슷한데, 차이는 다음과 같다.
| 구분 | STOMP | Redis Pub/Sub |
|---|---|---|
| 역할 | 서버와 클라이언트 간 메시지 전달 | 서버와 서버 간 메시지 전달 |
| 연결 방식 | Websocket 연결을 통해 전송 | TCP 소켓 기반 연결 |
| 사용 프로토콜 | STOMP (WebSocket 위에서 동작) | Redis 자체 프로토콜(RESP) |
| 전송 계층 | Websocket → TCP | TCP |
여기서 TCP라는 용어가 있는데, OSI 7계층의 전송계층에서 양방향 통신을 가능하게 해주는 프로토콜이다.
서로 연결하기 위해 3 Way Handshake, 전송 데이터 순서 보장... 등등 뭐 많은데
[HTTP] 1. HTTP 이해를 위한 사전 지식
이전에 강의들으면서 정리해놓은 것 참고하면 된다.
전송 계층 부분에 Websocket → TCP 표시가 되어 있는데,
Websocket 프로토콜을 사용하여 클라이언트와 서버 간의 지속적인 TCP 연결을 수립, 이를 통해 실시간 양방향 데이터 전송을 가능하게 하는 것으로 이해하면 된다.
Websocket이 뭐길래?
HTTP 통신을 위해 생성된 TCP 연결을 WebSocket으로 업그레이드한 뒤, 해당 TCP 연결 위에서 지속적인 양방향 통신을 수행한다.
자세한 개념은 다음 게시글에 정리하겠다.
import { useEffect, useState } from "react";
import { Client } from "@stomp/stompjs";
import SockJS from "sockjs-client";
import "./App.css";
function App() {
const [btcPrice, setBtcPrice] = useState("-");
const [ethPrice, setEthPrice] = useState("-");
useEffect(() => {
const client = new Client({
webSocketFactory: () => new SockJS("http://localhost:8080/ws"),
reconnectDelay: 5000,
onConnect: () => {
// BTC 구독
client.subscribe("/topic/price/BTCUSDT", (message) => {
const data = JSON.parse(message.body);
setBtcPrice(data.price);
});
// ETH 구독
client.subscribe("/topic/price/ETHUSDT", (message) => {
const data = JSON.parse(message.body);
setEthPrice(data.price);
});
},
});
client.activate();
return () => {
client.deactivate();
};
}, []);
return (
<div className="App">
<div className="content">
<div className="box">
<h3>비트코인 시세</h3>
<p>{btcPrice} USDT</p>
</div>
<div className="box">
<h3>이더리움 시세</h3>
<p>{ethPrice} USDT</p>
</div>
<div className="box asset-box">나의 자산정보</div>
<div className="box large">수익률 랭킹</div>
</div>
</div>
);
}
export default App;
STOMP 프로토콜 기반의 Websocket 연결을 통해 시세 정보를 구독하고, 메시지를 수신할 때마다 화면에 표시되는 시세 값을 업데이트한다.
그리고 React에서 소켓통신 그리고 STOMP 프로토콜을 활용하려면 패키지를 받아야한다.
npm install sockjs-client @stomp/stompjs

이러한 구조가 완성이 되었다.

그리고 구현 결과는 단순하긴한데, 이렇다.
구현과정에서 Websocket 개념, STOMP가 뭔지 찾고
STOMP가 Redis PUB/SUB이랑 어떻게 다른지 이해하는 것이 어려웠다.
CS 지식이 여기서 활용되네...
Websocket 구현은 이후 개념들 정리한번해보면서 마무리할 것이다.
다음은 매수/매도를 구현하면서 트랜잭션, 동시성에 대한 고민을 해보겠다.