목표
- 유저 A가 유저 B에게 DM을 보낸다
- 하나의 대화방이 생성되고, 만약 이미 존재하는 대화방이 있다면 기존에 나누었던 대화 목록이 나타난다
- 새 메시지는 실시간으로 전송되고 읽음 처리도 가능하다
테이블
converstations: A와 B가 주고받는 메시지 묶음
| 컬럼명 | 설명 | 타입 |
|---|
| id | primary key | int8 |
| create_at | 대화방이 생성된 시간 | timestamp |
| last_message_at | 가장 최근에 메시지를 보낸 시간 | timestamp |
| pair_key | 대화방 중복 생성을 막기 위한 unique key | text |
conversation_participants: 대화방에 참여 중인 유저
- conversation_id와 user_id를 primary key로 사용
| 컬럼명 | 설명 | 타입 |
|---|
| conversation_id | 참여하고 있는 대화방의 아이디 | int8 |
| user_id | 참여 중인 유저의 아이디 | uuid |
| created_at | 대화방에 참여한 시간 | timestamp |
| last_read_at | 마지막에 메시지를 읽은 시간 | timstamp |
messages: 유저가 상대방에게 전송한 메시지
| 컬럼명 | 설명 | 타입 |
|---|
| id | 메시지의 아이디 | int8 |
| conversation_id | 메시지가 속한 대화방의 아이디 | int8 |
| sender_id | 메시지를 보낸 유저의 아이디 | uuid |
| content | 메시지 내용 | text |
| created_at | 메시지 전송 시간 | timestamp |
대화방 중복 생성 문제
pair_key 생성
- 내가 상대방에게 처음 메시지를 보낼 때 방을 생성한다
- 그런데 상대방도 내게 처음 메시지를 보낼 때 방을 생성한다
- 이렇게 되면 두 명의 유저가 동시에 DM을 보냈을 때 방이 중복 생성될 수 있으므로 문제가 발생하고, 추가적으로 추후에 n명이 존재할 수 있는 대화방이 생겼을 때 1:1 대화방과 구분할 수 있는 방법이 없다.
- 따라서
pair_key라는, 두 유저 아이디의 조합을 생성한다 (아이디는 오름차순으로 정렬)
alter table public.conversations
add column pair_key text unique;
-- pair_key 예: 'uuidA:uuidB'
RPC 생성
- Supabase의 RPC는, DB 안에 미리 만들어 둔 함수를 프론트에서 API처럼 호출하는 것이다. 이를 통해 여러 개의 작업을 한 번에 처리할 수 있도록 한다
- DM을 열 때 매번 하는 “기존 방 찾기 ⇒ 없으면 생성 ⇒ 유저 두 명을 participants에 insert ⇒ 해당하는 대화방의 아이디를 리턴”이라는 과정을 Supabase의 RPC로 정의하여 프론트에서 한 번의 호출만으로 일련의 과정을 처리할 수 있게 한다
- 이러한 RPC를 사용하지 않으면, 위에서 작성한 일련의 과정을 프론트에서 여러 개의 쿼리를 통해 실행시켜야 하기 때문에 동시성 문제가 발생할 위험이 있다
- 작성한 RPC의 내용은 아래와 같음
create or replace function public.get_or_create_dm(other_user_id uuid)
returns int8
language plpgsql
security definer
as $$
declare
me uuid := auth.uid();
a text;
b text;
key text;
cid int8;
begin
if me is null then
raise exception 'not authenticated';
end if;
a := me::text;
b := other_user_id::text;
if a < b then key := a || ':' || b;
else key := b || ':' || a;
end if;
select id into cid
from public.conversations
where pair_key = key;
if cid is not null then
return cid;
end if;
insert into public.conversations (pair_key)
values (key)
returning id into cid;
insert into public.conversation_participants (conversation_id, user_id)
values (cid, me), (cid, other_user_id);
return cid;
end;
$$;
export const fetchOrCreateDm = async (userId: string) => {
const { data, error } = await supabase.rpc("get_or_create_dm", {
other_user_id: userId,
});
if (error) throw error;
return data;
};
실시간 메시지 전송
- 이번에도 이전의 앱 내 알림과 마찬가지로 Supabase의 Realtime을 사용한다. 사용한 방식은 거의 동일하다
- 대신 이전의 리스트와 다른 점은, 채팅방이기 때문에 최신 메시지가 가장 아래에 위치하고 스크롤의 맨 위에 도달했을 때 그 이전의 리스트를 추가로 불러와야 한다는 점
- 따라서 새로운 메시지를 수신했을 때 그 메시지를 맨 첫 번째 페이지에 추가한다
publication에 messages 테이블 추가
alter publication supabase_realtime
add table public.messages;
메시지 채널 구독
- 메시지 테이블에 현재 채팅방과 동일한 아이디를 가지고 있는 로우가 추가되었을 경우 알림 전달
export const subscribeMessages = (
conversationId: number,
onInsert: (message: MessageEntity) => void
) => {
const channel = supabase
.channel(`messages:${conversationId}`)
.on(
"postgres_changes",
{
event: "INSERT",
schema: "public",
table: "messages",
filter: `conversation_id=eq.${conversationId}`,
},
(payload) => {
onInsert(payload.new as MessageEntity);
}
)
.subscribe();
return () => {
supabase.removeChannel(channel);
};
};
컴포넌트 내 쿼리 데이터 처리
- 채팅 방 입장 시 실제로 WebSocket을 연결하고, 메시지 수신 이후의 처리에 대해 함수 작성
- 내가 보낸 것이든, 다른 유저가 보낸 것이든 메시지를 수신하여 전달 받은 메시지를 가장 첫 번째 페이지에 추가함
- limit를 이용한 페이지네이션이 아니라 cursor 방식을 사용하고 있으므로 데이터가 꼬일 염려 X
const updateQueryData = (message: MessageEntity) => {
const conversationId = message.conversation_id;
queryClient.setQueryData<InfiniteData<PageData<MessageEntity>>>(
QUERY_KEYS.dm.conversation(conversationId),
(prevMessages) => {
if (!prevMessages) throw new Error("메시지가 존재하지 않습니다.");
const exists = prevMessages.pages.some((page) =>
page.items.some((item) => item.id === message.id)
);
if (exists) return prevMessages;
const first = prevMessages.pages[0];
const nextFirst = {
...first,
items: [message, ...first.items],
};
return {
...prevMessages,
pages: [nextFirst, ...prevMessages.pages.slice(1)],
};
}
);
};
유저가 채팅방 밖에 있을 때 수신 처리
- 위에서는 특정 대화방에 대한 이벤트를 구독했고, 유저가 다른 화면에 있을 때 어떤 채팅방이든 메시지를 수신했다면 DM 메뉴에 뱃지 표시를 할 수 있도록 처리
메시지 채널 구독
- 메시지의 sender_id(보낸 사람의 user_id)가 내 것과 다를 때 메시지를 수신 (위의 이벤트와 조건이 다르다)
- Supabase의 RLS 정책 덕에 내가 아닌 다른 유저에게 전송된 메시지는 애초에 select를 할 수 없으므로, 나에게 수신된 메시지에 대해서만 이벤트를 받을 수 있음
export const subscribeIncomingDm = ({
userId,
onIncoming,
}: {
userId: string;
onIncoming: (message: MessageEntity) => void;
}) => {
const channel = supabase
.channel(`dm-incoming:${userId}`)
.on(
"postgres_changes",
{
event: "INSERT",
schema: "public",
table: "messages",
filter: `sender_id=neq.${userId}`,
},
(payload) => {
const msg = payload.new as MessageEntity;
onIncoming(msg);
}
)
.subscribe();
return () => {
supabase.removeChannel(channel);
};
};
읽지 않은 메시지가 존재하는지 조회하는 API
- Supabase에서 RPC를 생성하여 여러 개의 조건을 복잡하게 실행할 필요성을 줄임
export const fetchHasUnreadDm = async () => {
const { data, error } = await supabase.rpc("has_unread_dm");
if (error) throw error;
return data ?? false;
};
- useQuery로 훅을 생성하여 메시지를 수신할 경우 hasUnread 관련 쿼리 키를 invalidate
- 해당 데이터를 사용하고 있는 nav 컴포넌트에서 최신 데이터를 다시 받아오도록 처리
- dm 리스트도 마찬가지로 재호출
const unsubscribeDm = subscribeIncomingDm({
userId: session.user.id,
onIncoming: () => {
queryClient.invalidateQueries({
queryKey: QUERY_KEYS.dm.hasUnread,
});
queryClient.invalidateQueries({
queryKey: QUERY_KEYS.dm.list,
});
},
});
메시지 읽음 처리
- 내가 이 메시지방에 참여한 유저로서 저장되어 있는 conversation_participants의 last_read_at 컬럼을 업데이트 한다
- 마지막으로 메시지가 전송된 시간 > 마지막으로 메시지를 읽은 시간이라면 읽지 않은 메시지가 존재한다는 뜻이므로 이런 식으로 읽음 여부 + 읽지 않은 메시지의 개수도 계산할 수 있다
클라이언트에서 호출하는 API
- 그런데 이 코드에서 한 가지 문제가 있었던 것이, 내 컴퓨터의 시간이 실제 시간보다 약 3분 정도 빠르게 설정되어 있어서 읽지 않은 메시지도 읽었다고 표기되는 경우가 있었다
- new Date()로 시간을 생성해서 그런 것으로 보인다
- 컴퓨터 시간이 아니라 서버 시간으로 표시하는 로직이 필요할 것 같다
export const markDmAsRead = async ({
conversationId,
}: {
conversationId: number;
}) => {
const { data, error } = await supabase
.from("conversation_participants")
.update({ last_read_at: new Date().toISOString() })
.eq("conversation_id", conversationId)
.select()
.single();
if (error) throw error;
return data;
};
- 이후에는 메시지방을 들어왔을 때 읽음 처리를 하는 로직이 있는데, 메시지 방에 이미 들어온 상태에서 새로운 메시지가 수신되었을 때 읽음 처리가 정상적으로 이루어지지 않는 문제가 있었다.
- 메시지를 수신했을 때 최신 메시지가 화면에 보였는지 여부에 따라 읽음 처리를 실행하도록 바꿔야 할 필요성을 느꼈다. 어쨌든 아래와 같이 정상적으로 구현된 것을 확인. (+ dm 방 삭제 등에 대한 기능도 추가 필요함)
