[DB] INSERT Trigger → [Spring] SSE로 응답

Hyunji·2024년 2월 15일
0

외부 api에서 db에 새로운 데이터가 저장됨을 감지하고 실시간으로 내보내주는 기능을 만들기 위한 테스트해봤음

SPRING에서 DB 변동감지하는 방법 검색하다가 postgres의 notify/listen 이용했음

DB에 특정테이블에 새로운 로우가 추가될 때 트리거 이벤트를 실행하고 Spring에서 해당 이벤트를 Listen하고 Server-Sent Events(SSE)를 통해 내보낸다.

TRIGGER 생성

  1. 트리거 함수를 만든다.
CREATE OR REPLACE FUNCTION public.cctv_trigger_function()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
BEGIN
    PERFORM pg_notify(cast('new_data_inserted' as text) , row_to_json(NEW)::text); -- 이벤트 이름과,데이터 전달
    RETURN NULL;
END;
$function$;

pg_notify 함수 PostgreSQL의 LISTEN/NOTIFY 이용하여 "new_data_inserted"라는 이벤트를 발생시키고, 해당 이벤트와 함께 새로운 데이터를 JSON 형식으로 전달

채널명 : new_data_inserted

새로 INSERT되는 row를 json으로 내보냄

  1. cctv_trans_data 테이블에 INSERT시 위에서 생성한 함수를 실행하는 트리거를 만든다.
CREATE TRIGGER after_insert_trigger
AFTER INSERT ON public.cctv_trans_data 
FOR EACH ROW
EXECUTE FUNCTION cctv_trigger_function();

SSE 객체 생성 클래스

@Component
public class SseEmitterManager {

    private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();

    public SseEmitter createEmitter(HttpServletRequest request) {
        System.out.println("====SSE stream 접근 : " + request.getRemoteAddr());
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
        this.emitters.add(emitter);
        try {
            emitter.send(SseEmitter.event()
                    .name("connect")
                    .data("connected!", MediaType.APPLICATION_JSON));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        emitter.onCompletion(() -> this.emitters.remove(emitter));
        emitter.onTimeout(() -> this.emitters.remove(emitter));

        return emitter;
    }

    public CopyOnWriteArrayList<SseEmitter> getEmitters() {
        return this.emitters;
    }

}

PostgresNotificationListener 생성

  1. LISTEN 핸들러 메서드
public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {
	return() -> {
	        jdbcTemplate.execute((Connection c) -> {
	try{
	                c.createStatement().execute("LISTEN new_data_inserted");
	                System.out.println("====LISTEN====");
	
	                PGConnection pgconn = c.unwrap(PGConnection.class);
	while(!Thread.currentThread().isInterrupted()) {
	                    PGNotification[] nts = pgconn.getNotifications(10000);
	if(nts ==null|| nts.length == 0) {
	continue;
	                    }
	for(PGNotification nt : nts) {
	                        consumer.accept(nt);
	                    }
	                }
	            }catch(SQLException e) {
	                e.printStackTrace();
	            }
	return0;
	        });
	    };
	}

PostgreSQL의 LISTEN/NOTIFY 기능을 사용하여 데이터베이스에서 보낸 NOTIFY를 감지

  • Consumer : PGNotification 객체를 입력으로 받아서 해당 객체에 대한 처리를 수행
  • jdbcTemplate : 데이터베이스 커넥션 후 "LISTEN new_data_inserted"라는 명령을 실행하여 DB에서 "new_data_inserted"라는 이벤트를 구독함
  • PGConnection : 10초마다 DB이벤트를 가져옴. 이벤트가 발생하지 않을 경우에는 대기 이벤트가 발생하면 PGNotification 객체들의 배열을 가져오고, 각각 PGNotification에 대해 consumer.accept(nt)를 호출하여 입력받은 Consumer에 해당 이벤트를 전달
  • Runnable 을 반환한다. 해당 메서드가 호출되면 이벤트 핸들링을 담당하는 쓰레드를 생성하고, 그 쓰레드가 실행되는 로직을 Runnable 객체로 래핑하여 반환.
  1. @PostConstruct 초기화 메서드
@PostConstruct
    public void init() {
        Runnable notificationHandler = createNotificationHandler((PGNotification notification) -> {
            System.out.println("=====Received noti: " + notification.getName() + ", payload: " + notification.getParameter() + "=====");
            // sse 로 내보내는 로직 추가
            String payload = notification.getParameter();
            // 각 emitter에 payload를 전송
            sseEmitterManager.getEmitters().forEach(sseEmitter -> {
                try {
                    sseEmitter.send(payload, MediaType.APPLICATION_JSON);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        });
        Thread thread = new Thread(notificationHandler);
        thread.start();
    }
  • createNotificationHandler() 메서드로 notificationHandler라는 Runnable 객체를 생성
  • 알림을 받으면 해당 알림의 이름과 payload를 출력하고 sseEmitterManager를 통해 관리되는 모든 SSE emitter에게 payload를 전달
  • getEmitters() 현재 활성화된 모든 SSE emitter를 반환

Controller

@GetMapping(value ="/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter handle(HttpServletRequest request) {
        return sseEmitterManager.createEmitter(request);
    }

/connect 진입 시 sseEmitter 객체 생성

Client (javascript)

const eventSource = new EventSource("/connect")
        eventSource.onmessage = event => {
            const p = document.createElement("p")
            p.innerText = event.data

            document.getElementById("messages").appendChild(p)
        }

SSE 연결 후

디비버로 DB에 INSERT하면 화면에 데이터 출력됨

profile
ㅎㅇ

0개의 댓글