Supabase를 통한 DM 기능 구현

디듀·2026년 2월 8일

목표

  • 유저 A가 유저 B에게 DM을 보낸다
  • 하나의 대화방이 생성되고, 만약 이미 존재하는 대화방이 있다면 기존에 나누었던 대화 목록이 나타난다
  • 새 메시지는 실시간으로 전송되고 읽음 처리도 가능하다

테이블

converstations: A와 B가 주고받는 메시지 묶음

컬럼명설명타입
idprimary keyint8
create_at대화방이 생성된 시간timestamp
last_message_at가장 최근에 메시지를 보낸 시간timestamp
pair_key대화방 중복 생성을 막기 위한 unique keytext

conversation_participants: 대화방에 참여 중인 유저

  • conversation_id와 user_id를 primary key로 사용
컬럼명설명타입
conversation_id참여하고 있는 대화방의 아이디int8
user_id참여 중인 유저의 아이디uuid
created_at대화방에 참여한 시간timestamp
last_read_at마지막에 메시지를 읽은 시간timstamp

messages: 유저가 상대방에게 전송한 메시지

컬럼명설명타입
id메시지의 아이디int8
conversation_id메시지가 속한 대화방의 아이디int8
sender_id메시지를 보낸 유저의 아이디uuid
content메시지 내용text
created_at메시지 전송 시간timestamp

대화방 중복 생성 문제

pair_key 생성

  • 내가 상대방에게 처음 메시지를 보낼 때 방을 생성한다
  • 그런데 상대방도 내게 처음 메시지를 보낼 때 방을 생성한다
  • 이렇게 되면 두 명의 유저가 동시에 DM을 보냈을 때 방이 중복 생성될 수 있으므로 문제가 발생하고, 추가적으로 추후에 n명이 존재할 수 있는 대화방이 생겼을 때 1:1 대화방과 구분할 수 있는 방법이 없다.
  • 따라서 pair_key라는, 두 유저 아이디의 조합을 생성한다 (아이디는 오름차순으로 정렬)
alter table public.conversations
add column pair_key text unique;

-- pair_key 예: 'uuidA:uuidB'

RPC 생성

  • Supabase의 RPC는, DB 안에 미리 만들어 둔 함수를 프론트에서 API처럼 호출하는 것이다. 이를 통해 여러 개의 작업을 한 번에 처리할 수 있도록 한다
  • DM을 열 때 매번 하는 “기존 방 찾기 ⇒ 없으면 생성 ⇒ 유저 두 명을 participants에 insert ⇒ 해당하는 대화방의 아이디를 리턴”이라는 과정을 Supabase의 RPC로 정의하여 프론트에서 한 번의 호출만으로 일련의 과정을 처리할 수 있게 한다
  • 이러한 RPC를 사용하지 않으면, 위에서 작성한 일련의 과정을 프론트에서 여러 개의 쿼리를 통해 실행시켜야 하기 때문에 동시성 문제가 발생할 위험이 있다
  • 작성한 RPC의 내용은 아래와 같음
create or replace function public.get_or_create_dm(other_user_id uuid)
returns int8
language plpgsql
security definer
as $$
declare
  me uuid := auth.uid();
  a text;
  b text;
  key text;
  cid int8;
begin
  if me is null then
    raise exception 'not authenticated';
  end if;

  a := me::text;
  b := other_user_id::text;

  if a < b then key := a || ':' || b;
  else key := b || ':' || a;
  end if;

  select id into cid
  from public.conversations
  where pair_key = key;

  if cid is not null then
    return cid;
  end if;

  insert into public.conversations (pair_key)
  values (key)
  returning id into cid;

  insert into public.conversation_participants (conversation_id, user_id)
  values (cid, me), (cid, other_user_id);

  return cid;
end;
$$;
  • 클라이언트에서 RPC 실행
export const fetchOrCreateDm = async (userId: string) => {
  const { data, error } = await supabase.rpc("get_or_create_dm", {
    other_user_id: userId,
  });

  if (error) throw error;
  return data;
};

실시간 메시지 전송

  • 이번에도 이전의 앱 내 알림과 마찬가지로 Supabase의 Realtime을 사용한다. 사용한 방식은 거의 동일하다
  • 대신 이전의 리스트와 다른 점은, 채팅방이기 때문에 최신 메시지가 가장 아래에 위치하고 스크롤의 맨 위에 도달했을 때 그 이전의 리스트를 추가로 불러와야 한다는 점
  • 따라서 새로운 메시지를 수신했을 때 그 메시지를 맨 첫 번째 페이지에 추가한다

publication에 messages 테이블 추가

alter publication supabase_realtime
add table public.messages;

메시지 채널 구독

  • 메시지 테이블에 현재 채팅방과 동일한 아이디를 가지고 있는 로우가 추가되었을 경우 알림 전달
export const subscribeMessages = (
  conversationId: number,
  onInsert: (message: MessageEntity) => void
) => {
  const channel = supabase
    .channel(`messages:${conversationId}`)
    .on(
      "postgres_changes",
      {
        event: "INSERT",
        schema: "public",
        table: "messages",
        filter: `conversation_id=eq.${conversationId}`,
      },
      (payload) => {
        onInsert(payload.new as MessageEntity);
      }
    )
    .subscribe();

  return () => {
    supabase.removeChannel(channel);
  };
};

컴포넌트 내 쿼리 데이터 처리

  • 채팅 방 입장 시 실제로 WebSocket을 연결하고, 메시지 수신 이후의 처리에 대해 함수 작성
  • 내가 보낸 것이든, 다른 유저가 보낸 것이든 메시지를 수신하여 전달 받은 메시지를 가장 첫 번째 페이지에 추가함
  • limit를 이용한 페이지네이션이 아니라 cursor 방식을 사용하고 있으므로 데이터가 꼬일 염려 X
  const updateQueryData = (message: MessageEntity) => {
    const conversationId = message.conversation_id;

    queryClient.setQueryData<InfiniteData<PageData<MessageEntity>>>(
      QUERY_KEYS.dm.conversation(conversationId),
      (prevMessages) => {
        if (!prevMessages) throw new Error("메시지가 존재하지 않습니다.");

        const exists = prevMessages.pages.some((page) =>
          page.items.some((item) => item.id === message.id)
        );
        if (exists) return prevMessages;

        const first = prevMessages.pages[0];

        const nextFirst = {
          ...first,
          items: [message, ...first.items],
        };

        return {
          ...prevMessages,
          pages: [nextFirst, ...prevMessages.pages.slice(1)],
        };
      }
    );
  };

유저가 채팅방 밖에 있을 때 수신 처리

  • 위에서는 특정 대화방에 대한 이벤트를 구독했고, 유저가 다른 화면에 있을 때 어떤 채팅방이든 메시지를 수신했다면 DM 메뉴에 뱃지 표시를 할 수 있도록 처리

메시지 채널 구독

  • 메시지의 sender_id(보낸 사람의 user_id)가 내 것과 다를 때 메시지를 수신 (위의 이벤트와 조건이 다르다)
  • Supabase의 RLS 정책 덕에 내가 아닌 다른 유저에게 전송된 메시지는 애초에 select를 할 수 없으므로, 나에게 수신된 메시지에 대해서만 이벤트를 받을 수 있음
export const subscribeIncomingDm = ({
  userId,
  onIncoming,
}: {
  userId: string;
  onIncoming: (message: MessageEntity) => void;
}) => {
  const channel = supabase
    .channel(`dm-incoming:${userId}`)
    .on(
      "postgres_changes",
      {
        event: "INSERT",
        schema: "public",
        table: "messages",
        filter: `sender_id=neq.${userId}`,
      },
      (payload) => {
        const msg = payload.new as MessageEntity;

        onIncoming(msg);
      }
    )
    .subscribe();

  return () => {
    supabase.removeChannel(channel);
  };
};

읽지 않은 메시지가 존재하는지 조회하는 API

  • Supabase에서 RPC를 생성하여 여러 개의 조건을 복잡하게 실행할 필요성을 줄임
export const fetchHasUnreadDm = async () => {
  const { data, error } = await supabase.rpc("has_unread_dm");

  if (error) throw error;
  return data ?? false;
};
  • useQuery로 훅을 생성하여 메시지를 수신할 경우 hasUnread 관련 쿼리 키를 invalidate
  • 해당 데이터를 사용하고 있는 nav 컴포넌트에서 최신 데이터를 다시 받아오도록 처리
  • dm 리스트도 마찬가지로 재호출
const unsubscribeDm = subscribeIncomingDm({
  userId: session.user.id,
  onIncoming: () => {
    queryClient.invalidateQueries({
      queryKey: QUERY_KEYS.dm.hasUnread,
    });

    queryClient.invalidateQueries({
      queryKey: QUERY_KEYS.dm.list,
    });
  },
});

메시지 읽음 처리

  • 내가 이 메시지방에 참여한 유저로서 저장되어 있는 conversation_participants의 last_read_at 컬럼을 업데이트 한다
  • 마지막으로 메시지가 전송된 시간 > 마지막으로 메시지를 읽은 시간이라면 읽지 않은 메시지가 존재한다는 뜻이므로 이런 식으로 읽음 여부 + 읽지 않은 메시지의 개수도 계산할 수 있다

클라이언트에서 호출하는 API

  • 그런데 이 코드에서 한 가지 문제가 있었던 것이, 내 컴퓨터의 시간이 실제 시간보다 약 3분 정도 빠르게 설정되어 있어서 읽지 않은 메시지도 읽었다고 표기되는 경우가 있었다
    • new Date()로 시간을 생성해서 그런 것으로 보인다
  • 컴퓨터 시간이 아니라 서버 시간으로 표시하는 로직이 필요할 것 같다
export const markDmAsRead = async ({
  conversationId,
}: {
  conversationId: number;
}) => {
  const { data, error } = await supabase
    .from("conversation_participants")
    .update({ last_read_at: new Date().toISOString() })
    .eq("conversation_id", conversationId)
    .select()
    .single();

  if (error) throw error;
  return data;
};

  • 이후에는 메시지방을 들어왔을 때 읽음 처리를 하는 로직이 있는데, 메시지 방에 이미 들어온 상태에서 새로운 메시지가 수신되었을 때 읽음 처리가 정상적으로 이루어지지 않는 문제가 있었다.
  • 메시지를 수신했을 때 최신 메시지가 화면에 보였는지 여부에 따라 읽음 처리를 실행하도록 바꿔야 할 필요성을 느꼈다. 어쨌든 아래와 같이 정상적으로 구현된 것을 확인. (+ dm 방 삭제 등에 대한 기능도 추가 필요함)

profile
세상에서 가장 부지런한 사람이 되고 싶은 게으름뱅이

0개의 댓글