외부 api에서 db에 새로운 데이터가 저장됨을 감지하고 실시간으로 내보내주는 기능을 만들기 위한 테스트해봤음
SPRING에서 DB 변동감지하는 방법 검색하다가 postgres의 notify/listen 이용했음
DB에 특정테이블에 새로운 로우가 추가될 때 트리거 이벤트를 실행하고 Spring에서 해당 이벤트를 Listen하고 Server-Sent Events(SSE)를 통해 내보낸다.
CREATE OR REPLACE FUNCTION public.cctv_trigger_function()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
PERFORM pg_notify(cast('new_data_inserted' as text) , row_to_json(NEW)::text); -- 이벤트 이름과,데이터 전달
RETURN NULL;
END;
$function$;
pg_notify
함수 PostgreSQL의 LISTEN/NOTIFY 이용하여 "new_data_inserted"라는 이벤트를 발생시키고, 해당 이벤트와 함께 새로운 데이터를 JSON 형식으로 전달
채널명 : new_data_inserted
새로 INSERT되는 row를 json으로 내보냄
CREATE TRIGGER after_insert_trigger
AFTER INSERT ON public.cctv_trans_data
FOR EACH ROW
EXECUTE FUNCTION cctv_trigger_function();
@Component
public class SseEmitterManager {
private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();
public SseEmitter createEmitter(HttpServletRequest request) {
System.out.println("====SSE stream 접근 : " + request.getRemoteAddr());
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
this.emitters.add(emitter);
try {
emitter.send(SseEmitter.event()
.name("connect")
.data("connected!", MediaType.APPLICATION_JSON));
} catch (IOException e) {
throw new RuntimeException(e);
}
emitter.onCompletion(() -> this.emitters.remove(emitter));
emitter.onTimeout(() -> this.emitters.remove(emitter));
return emitter;
}
public CopyOnWriteArrayList<SseEmitter> getEmitters() {
return this.emitters;
}
}
PostgresNotificationListener
생성public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {
return() -> {
jdbcTemplate.execute((Connection c) -> {
try{
c.createStatement().execute("LISTEN new_data_inserted");
System.out.println("====LISTEN====");
PGConnection pgconn = c.unwrap(PGConnection.class);
while(!Thread.currentThread().isInterrupted()) {
PGNotification[] nts = pgconn.getNotifications(10000);
if(nts ==null|| nts.length == 0) {
continue;
}
for(PGNotification nt : nts) {
consumer.accept(nt);
}
}
}catch(SQLException e) {
e.printStackTrace();
}
return0;
});
};
}
PostgreSQL의 LISTEN/NOTIFY 기능을 사용하여 데이터베이스에서 보낸 NOTIFY를 감지
Consumer
: PGNotification 객체를 입력으로 받아서 해당 객체에 대한 처리를 수행jdbcTemplate
: 데이터베이스 커넥션 후 "LISTEN new_data_inserted"라는 명령을 실행하여 DB에서 "new_data_inserted"라는 이벤트를 구독함PGConnection
: 10초마다 DB이벤트를 가져옴. 이벤트가 발생하지 않을 경우에는 대기 이벤트가 발생하면 PGNotification 객체들의 배열을 가져오고, 각각 PGNotification에 대해 consumer.accept(nt)
를 호출하여 입력받은 Consumer에 해당 이벤트를 전달Runnable
을 반환한다. 해당 메서드가 호출되면 이벤트 핸들링을 담당하는 쓰레드를 생성하고, 그 쓰레드가 실행되는 로직을 Runnable 객체로 래핑하여 반환.@PostConstruct
public void init() {
Runnable notificationHandler = createNotificationHandler((PGNotification notification) -> {
System.out.println("=====Received noti: " + notification.getName() + ", payload: " + notification.getParameter() + "=====");
// sse 로 내보내는 로직 추가
String payload = notification.getParameter();
// 각 emitter에 payload를 전송
sseEmitterManager.getEmitters().forEach(sseEmitter -> {
try {
sseEmitter.send(payload, MediaType.APPLICATION_JSON);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
});
Thread thread = new Thread(notificationHandler);
thread.start();
}
createNotificationHandler()
메서드로 notificationHandler
라는 Runnable 객체를 생성sseEmitterManager
를 통해 관리되는 모든 SSE emitter에게 payload를 전달getEmitters()
현재 활성화된 모든 SSE emitter를 반환@GetMapping(value ="/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handle(HttpServletRequest request) {
return sseEmitterManager.createEmitter(request);
}
/connect
진입 시 sseEmitter 객체 생성
const eventSource = new EventSource("/connect")
eventSource.onmessage = event => {
const p = document.createElement("p")
p.innerText = event.data
document.getElementById("messages").appendChild(p)
}
SSE 연결 후
디비버로 DB에 INSERT하면 화면에 데이터 출력됨