안녕하세요. 이번에는 Spring으로 SSE에 대해서 알아보겠습니다. 일단 웹 애플리케이션을 개발하다보면 클라이언트 요청이 없어도 서버에서 데이터의 변경 또는 조건이 발생하면 데이터를 전달해줘야 하는 경우가 있습니다.
대표적으로 알림, 랭킹 시스템, 뉴스피드 등 다양한 기능이 있고 이것에 요구사항에 실시간이라는 조건이 추가된다면 단순히 클라이언트 단에서는 setInterval()
을 통하여 특정 시간마다 요청 또는 서버 측에서는 mysql에 짧은 시간동안 polling
을 지속적으로 하여 재요청을 하는 방식이 있습니다.
전통적인 Client-Server 모델의 HTTP 통신에는 이러한 기능을 구현하기 남감합니다. 왜냐하면 기본적으로 클라이언트의 요청이 있어야지 서버가 응답을 보낼 수 있는데 클라이언트가 변경점을 확인하지 못하기 때문입니다.
SSE에 대해서 알아보자
분산서버에서 SSE 문제가 발생하는 상황
분산서버에서 동기화 문제를 해결하기 위한 방법 (MySQL CDC Binlog)
여기서 3번을 보면 동기화 문제를 해결하는 방법이 있습니다. 그런데 일반적으로 이 문제는 Pub/Sub 또는 카프카 등 메세지 방식을 사용하면 해결할 수 있습니다. 하지만 저는 MySQL Polling방식으로 스프링이 MySQL의 변경된 이벤트를 기반으로 해결하겠습니다. 대표적인 키워드는 MySQL binlog 입니다.
SSE (Server-Sent Events)는 클라이언트와 서버 간의 1:1 관계를 유지하면서 서버에서 클라이언트로 데이터를 전송하는 기술입니다. 이벤트가 서버에서 발생할 때마다, 서버는 해당 이벤트를 클라이언트로 전송합니다. 이러한 전송 방식은 HTTP 스트리밍을 통해 이루어지며, HTML5의 표준 기술 중 하나입니다.
이때 SSE의 특징 및 주의해야 되는 부분이 있습니다.
단방향 통신: SSE는 서버에서 클라이언트로의 단방향 통신을 제공합니다. 클라이언트는 서버로부터만 데이터를 수신하고, 서버로 데이터를 보낼 수 없습니다.
HTTP/1.1 및 HTTP/2 프로토콜 제한: HTTP/1.1 프로토콜을 사용할 경우, 브라우저는 한 도메인당 최대 6개의 EventStream을 유지할 수 있습니다. HTTP/2 프로토콜에서는 브라우저와 서버 간의 협상에 따라 최대 100개까지 가능합니다.
UTF-8 인코딩: SSE는 UTF-8 인코딩된 문자열만을 지원합니다. 따라서 서버에서 이벤트 데이터를 전송할 때, 이 데이터를 UTF-8로 인코딩하여 전송해야 합니다. 대부분의 경우, 서버는 JSON 형식으로 이벤트 데이터를 인코딩하여 전송합니다.
브라우저 지원: 현재 Internet Explorer를 제외한 대부분의 최신 브라우저에서 SSE를 지원합니다. JavaScript를 사용하여 클라이언트 측에서 EventSource를 통해 SSE 연결을 생성하고, 전송된 이벤트를 처리할 수 있습니다.
Spring Framework의 지원: Spring Framework는 4.2 버전부터 SseEmitter 클래스를 제공하여 서버 측에서 SSE 통신을 구현할 수 있습니다. 이를 통해 Spring 기반의 애플리케이션에서 SSE를 쉽게 구현하고 활용할 수 있습니다.
GET /connect HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache
이벤트의 미디어 타입은 text/event-stream을 사용하며 캐싱을 사용하지 않으며 지속적 연결을 사용한다. 또한 http 1.1에서는 keep-alive를 사용하여 지속 연결을 한다.
HTTP 1.0
요청 컨텐츠마다 TCP 커넥션을 맺고 끊음을 반복한다.
요청1 -> 응답1 -> 요청2 -> 응답2 순으로 순차적으로 진행된다. 즉, 응답을 받아야만 다음 작업을 한다.
HTTP 1.1
매 요청마다 TCP 커넥션을 맺고 끊음을 반복하지 않고 keep-alive를 통해 일정 시간 동안 커넥션을 유지한다.
클라이언트는 각 요청에 대한 응답을 기다리지 않고 여러개의 요청을 연속적으로 보낸다.(파이프라이닝) 하지만
각 응답의 처리는 순차적으로 처리된다.
HTTP/1.1 200
Content-Type: text/event-stream
Transfer-Encoding: chunked
Transfer-Encoding: chunked
이라고 적혀져 있다. 이것은 전송 인코딩 메너니즘의 하나로 데이터를 여러 조각으로 분할하여 전송하는데 사용한다. 청크를 통해 데이터를 전송하여 데이터 크기를 사전에 정의하지 않고 실시간으로 전송할 수 있게 한다.메시지의 데이터 필드. EventSource가 데이터로 시작하는 여러 줄의 연속 줄을 받으면, 그것들을 연결하여 각 줄 사이에 줄 바꿈 문자를 삽입합니다. 후행 줄 바꿈이 제거됩니다.
라고 나온다. HTTP/1.1 200 OK
Content-Type: text/event-stream
data: Hello\n\n
location /subscribe {
proxy_pass http://127.0.0.1:8080/subscribe;
proxy_set_header Connection '';
proxy_set_header Cache-Control 'no-cache';
proxy_set_header X-Accel-Buffering 'no';
proxy_set_header Content-Type 'text/event-stream';
proxy_buffering off;
proxy_read_timeout 864000s;
chunked_transfer_encoding on;
}
하지만 현업에서 특정 기술을 도입은 신중하게 생각을 해야되고, 현실상 기술을 학습하여 트레이드 오프를 학습하고 적용을 해도 되겠다고 생각해도 팀원의 반대 또는 유지보수 관점에서 적용이 힘들 수 있다.
그래서 나는 현업에서 Redis는 유지보수 관점 및 팀원들은 새로운 기술적인 도입보다 Polling 방식으로 문제를 해결할 수 있다고 생각하여 Redis Pub/Sub을 통해 문제를 해결하지 못하였다. 그러면 나한테 남은 선택지는 결국 분산 서버의 환경에서는 MySQL을 이용한 방식만 존재를 한다. 이때 MySQL은 관계형 데이터베이스 이기 때문에 MySQL에서 Spring으로 요청을 날릴 수 없다.
그래서 나는 이 문제를 해결하기 위해서 처음에는
1. crontab을 이용한 반복적인 검사
2. spring 스케줄러를 통한 변경 감지
3. insert를 하였을 때 특정 분산 서버에 sse를 reload하게 api call을 발생하게 작성
- 하지만 근본적으로 이 해결법의 문제는 불필요한 요청을 계속 보낸다. 또한 was가 cpu가 증가하면 ec2가 생성되는 auto scailing 구조에서는 api call 로직을 그때마다 수정해야 된다는 문제가 발생을 하였습니다.
그래서 나는 MySQL Binlog를 사용을 하여 문제를 했다. 결국 근본적으로 Mysql에서 특정 쿼리가 발생을 하였을 때 또는 상태가 변하였을 때를 스프링이 감지할 수 있게 된다면 문제가 해결할 수 있다고 생각했다.
단 Select는 저장하지 않기 때문에 이벤트를 탐지를 할 수 없다.
# 바이너리로 보기
cat [바이너리파일]
# mysqlbinlog
mysqlbinlog [바이너리파일] [옵션]
setInterval
로 10초마다 재시작을 하여 실시간을 처리를 하려고함 (이때 1번당 쿼리는 최소 3개)binlog에는 select 이벤트는 없다.
insert into board (title, content, writer) values ('111', '111', '111t');
//binlog
implementation 'com.github.shyiko:mysql-binlog-connector-java:0.21.0'
@EnableConfigurationProperties(BinlogConfiguration.class)
@SpringBootApplication
public class MybatisApplication {
public static void main(String[] args) {
SpringApplication.run(MybatisApplication.class, args);
}
}
@Getter
@Setter
@ConfigurationProperties(prefix = "binlog")
public class BinlogConfiguration{
/** DB host(ipv4) */
@Value("${binlog.host}")
private String host;
/** DB port */
@Value("${binlog.port}")
private int port;
/** DB username */
@Value("${binlog.user}")
private String user;
/** DB password */
@Value("${binlog.password}")
private String password;
@Bean
BinaryLogClient binaryLogClient(){
BinaryLogClient binaryLogClient = new BinaryLogClient(
host,
port,
user,
password);
// 받은 데이터를 BYTE 로 표현
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
binaryLogClient.setEventDeserializer(eventDeserializer);
return binaryLogClient;
}
}
@Component
public class BinlogEventRunner implements ApplicationRunner, BinaryLogClient.EventListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
private final BinaryLogClient binaryLogClient;
private final ObjectMapper objectMapper;
private final ReloadEventPublisher publisher;
public BinlogEventRunner(BinaryLogClient binaryLogClient, ObjectMapper objectMapper, ReloadEventPublisher publisher) {
this.binaryLogClient = binaryLogClient;
this.objectMapper = objectMapper;
this.publisher = publisher;
}
private static int TABLE_ID = 0;
@Override
public void run(ApplicationArguments args) throws Exception {
try {
setBinlogClient();
} catch (Exception e) {
logger.info(" ====== /BinlogEventRunner [" + getClass().getSimpleName() + ".run()] exitBinlogClient Exception : ", e);
exitBinlogClient();
}
}
@Override
public void onEvent(Event event) {
String eventStringInfo = "";
String tableName = "";
try {
eventStringInfo = objectMapper.writeValueAsString(event);
} catch (Exception e) {
logger.info(" ====== /BinlogEventRunner [" + getClass().getSimpleName() + ".onEvent()] exception : ", e);
}
if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
TableMapEventData tableMapEventData = event.getData();
tableName = tableMapEventData.getTable();
if (Objects.equals(tableMapEventData.getDatabase(), "db이름 작성") && Objects.equals(tableName, "테이블 이름")) {
logger.info(" ====== /BinlogEventRunner [" + getClass().getSimpleName() + ".onEvent()] tableMap : ", eventStringInfo);
JsonNode rootNode = null;
try {
rootNode = objectMapper.readTree(eventStringInfo);
} catch (JsonProcessingException e) {
logger.info(" ====== /BinlogEventRunner [" + getClass().getSimpleName() + ".onEvent()] exception : ", e);
}
JsonNode dataNode = rootNode.get("data");
TABLE_ID = dataNode.get("tableId").asInt();
}
}
if (event.getHeader().getEventType() == EventType.EXT_WRITE_ROWS) {
JsonNode writeNode = null;
try {
String writeValues = objectMapper.writeValueAsString(event);
writeNode = objectMapper.readTree(writeValues);
} catch (Exception e) {
logger.info(" ====== /BinlogEventRunner [" + getClass().getSimpleName() + ".onEvent()] exception : ", e);
}
JsonNode dataNode = writeNode.get("data");
int writeTableId = dataNode.get("tableId").asInt();
if (writeTableId == TABLE_ID) {
WriteRowsEventData data = event.getData();/
Serializable[] s_data = data.getRows().get(0);
for (Serializable sDatum : s_data) {
if (sDatum instanceof byte[]) {
byte[] byteData = (byte[]) sDatum;
String stringData = new String(byteData, StandardCharsets.UTF_8);
logger.info(" ====== /BinlogEventRunner [" + getClass().getSimpleName() + ".onEvent()] stringData : " + stringData);
}
}
try {
LadderCondition ladderConditionIsSessionN = ladderDAO.getLadderConditionIsSessionN();
logger.info(" ====== /BinlogEventRunner [" + getClass().getSimpleName() + ".try onEvnet()] : ", objectMapper.writeValueAsString(ladderConditionIsSessionN));
publisher.sessionReloadEvent(objectMapper.writeValueAsString(ladderConditionIsSessionN));
}catch (Exception e){
logger.info(" ====== /BinlogEventRunner [" + getClass().getSimpleName() + ".onEvent()] exception : ", e);
}
}
}
}
private void setBinlogClient() throws IOException, TimeoutException {
binaryLogClient.setServerId(binaryLogClient.getServerId() - 1);
binaryLogClient.setKeepAlive(true);
binaryLogClient.registerEventListener(this);
binaryLogClient.connect(5000);
}
private void exitBinlogClient() throws IOException {
try {
binaryLogClient.unregisterEventListener(this);
} finally {
binaryLogClient.disconnect();
}
}
}
[ 해결 ]
[ 부족한 부분 ]
현재는 binlog를 통해서 전체 was가 event를 발생하는데 만약에 확장을 하게 된다면 이 부분은 추가적인 리펙토링을 해야된다. 현재 상황에서는 최대한의 성과를 만들기 위해서 노력을 하였지만 프로젝트가 끝나고 회고를 하면서 이 부분이 추후에 기술적인 부채가 되지 않을까 고민이 되었다.
새로운 기술에 대한 trade off를 개인적으로 고려를 하여 현재 방식을 선택을 하였지만 더 좋은 방법, 비즈니스로 문제를 풀 수 없었을까? 아쉬움이 남는다.
https://tecoble.techcourse.co.kr/post/2022-10-11-server-sent-events/
https://github.com/osheroff/mysql-binlog-connector-java