WebRTC P2P방식에서 SFU방식으로 전환하기

홍석주·2024년 9월 25일
1

Kwitch

목록 보기
2/6

WebRTC를 다룰 때, 표준의 흐름을 따른다면 RTCPeerConnection을 이용해 P2P 연결을 하는 것이 일반적이다. 다만 이는 1대1 연결에서 유리한데 현재 구현하고 있는 '크위치'에서는 방송인과 시청자들이 1대N으로 연결되기 때문에 방송인 측에서 커넥션이 너무 많아져 5명정도만 방송에 참여해도 방송의 질이 많이 하락한다. 따라서 SFU 방식으로 전환해보려고 한다.

SFU 방식의 장점

서버의 종류
SFU 방식은 종단 간 미디어 트래픽을 중계하는 중앙 서버 방식으로 Mesh(P2P) 방식과 다르게 모든 클라이언트가 서로 연결되지 않기 때문에 클라이언트는 여러개의 연결을 유지할 필요 없이 서버만 peer로 연결되어 전달하면 된다. 데이터가 서버를 거치고 돌아오기 때문에 P2P 방식보다 느리지만 비슷한 수준의 실시간성을 유지할 수 있고, 클라이언트의 부하가 줄어든다.

크위치에선?

크위치의 연결 방식
크위치에선 화상 채팅처럼 모든 사용자가 영상을 전송할 필요가 없기 때문에 방송인은 서버에 전송만, 시청자는 수신만 하면 된다. 위의 방식을 들었을 때 예상되듯이 서버에서 모든 트래픽을 전달해 주어야 하기 때문에 서버 비용이 증가하지만, 모든 사용자의 영상을 받는 것이 아니기 떄문에 비용이 크게 나오진 않을 것으로 예상한다.

mediasoup의 이해

연결을 해보기 전에 mediasoup에 대해 간단한 이해가 필요하다.
mediasoup architecture
위 그림은 mediasoup 공식 README에서 제공하는 architecture이다. 여기서 중요한 용어를 뽑아서 설명해보겠다.

  • Worker: mediasoup의 하위 프로세스로써, Router를 처리하는데 단일 CPU 코어에서 실행되기 때문에 CPU 코어 개수를 넘지 못한다.
  • Router: Router에서 생성된 Transport를 이용해 미디어 스트림을 주입, 선택 및 전달할 수 있다. 크위치로 바라보면 한 라이브 채널로 볼 수 있겠고, 간단하게는 '방' 개념으로 생각하면 좋다.

    공식문서에는 Router를 multi-party conference room으로 생각할 수 있지만, Router는 그보다 훨씬 low level에서 정의되고 특정 상위 레벨의 use case에 국한되지 않는다고 쓰여져 있습니다. (하나의 방에도 여러 개의 Router가 사용될 수 있고 그건 서비스에 따라 다르다는 것 같습니다.)

  • Transport: Router에 연결되어 Router에서 생성된 Producer, Consumer 등을 양방향으로 전송할 수 있도록 해줍니다. 간단하게는 '통로'정도로 생각하면 된다. 아키텍처에 보이다 싶이 여러 Producer와 Consumer를 등록할 수 있다.
  • Producer: Router에 들어가는 오디오 또는 비디오 소스를 나타낸다.
  • Consumer: 엔드포인트로 전달되는 오디오 또는 비디오 소스를 나타낸다.
  • Device: outer와 연결되어 미디어를 송신 또는 수신하는 endpoint를 나타낸다.

추가적으로 전송하는 Transport를 SendTransport, 수신은 RecvTransport라고 하고 클라이언트에는 local, 서버측은 remote를 붙인다.

코드로 흐름 따라가기

위 내용을 정리해서 크위치에 적용해보면 방송인에겐 SendTransport 하나에 Audio, Video Producer 총 2개가 필요하고, 시청자에겐 RecvTransport 하나에 2개의 Consumer가 필요하다.

따라서 먼저 서버에 Broadcast라는 class를 생성했다.
broadcast.ts

export interface Peer {
  recvTransport: mediasoup.types.WebRtcTransport;
  consumers: Map<string, mediasoup.types.Consumer>; // Map<consumerId, Consumer>
}

export interface Broadcaster {
  sendTransport: mediasoup.types.WebRtcTransport;
  producers: Map<string, mediasoup.types.Producer>; // Map<producerId, Producer>
}

export default class Broadcast {
  private readonly router: mediasoup.types.Router;
  private readonly channel: Channel;

  private readonly broadcaster: Broadcaster = { sendTransport: null, producers: new Map() };
  private readonly peers: Map<string, Peer> = new Map(); // Map<socketId, Peer>

  public title: string;
  
  ... 중략
}

여기서 Peer는 시청자를, Broadcaster는 방송인을 말한다. Broadcast가 진행되고 있는 방송이다. 방송인이 방송을 시작하면 Broadcast 인스턴스를 생성해 Router를 생성한다.

Broadcast 인스턴스는 BroadcastService에서 Map<channelId: string, Broadcast>로 관리된다.

개인적으로 용어의 사용이 더 명확해야 한다고 생각해서 나중에 리팩토링을 해야겠다고 생각중입니다.

broadcast/page.tsx 방송인 측 클라이언트 코드
channels/[channelId]/page.tsx 시청자 측 클라이언트 코드
SFUConnectionHandler.ts mediasoup 관련 Socket.IO handler

  1. 먼저 방송인 측이 방송을 시작하면, 서버에 요청해 RtpCapabilities를 받아 Device를 생성한다.(RtpCapabilities는 mediasoup과 엔드포인트가 처리하는 미디어 수준을 정의한다.)
// 클라이언트 측
const startBroadcast = () => {
  if (!title || onAir) return;

  socket.emit("broadcasts:start", title, async (res: SocketResponse) => {
    if (res.success === false) {
      setWarning(res.message);
      return;
    }

    rtpCapabilities.current = res.content.rtpCapabilities;
    console.log("RTP Capabilities: ", rtpCapabilities.current);
    await _createDevice();

    setWarning("");
    setOnAir(true);
  });
};

const _createDevice = async () => {
  device.current = new mediasoup.Device();
  assert(rtpCapabilities.current, "RTP Capabilities is not defined");
  await device.current.load({
    routerRtpCapabilities: rtpCapabilities.current,
  });
  _createSendTransport();
};
  1. 이후 방송인 측에서 서버에 Transport 생성을 요청한다.
// 클라이언트 측
const _createSendTransport = () => {
  assert(user, "User is not defined");

  socket.emit(
    "sfu:create-transport",
    { channelId: user.channel.id, isSender: true },
    async (res: SocketResponse) => { ... }
  );
};
// 서버 측
const createTransport = async (
  router: mediasoup.types.Router,
): Promise<mediasoup.types.WebRtcTransport> => {
  const { transportOptions } = config.mediasoup;
  const transport = await router.createWebRtcTransport(transportOptions);
  console.log(`transport ID: ${transport.id}`);

  transport.on("dtlsstatechange", (dtlsState) => {
    if (dtlsState === "closed") {
      console.log("transport closed");
    }
  });

  return transport;
};

socket.on(
  "sfu:create-transport",
  async (
    { channelId, isSender }: { channelId: string; isSender: boolean },
    done,
  ) => {
    console.log("Is this a producer request?", isSender);
    const broadcast = this.broadcastService.getBroadcast(channelId);
    const router = broadcast.getRouter();

    try {
      const transport = await createTransport(router);
      if (isSender) {
        broadcast.setSendTransport(transport);
      } else {
        broadcast.setRecvTransport(socket.id, transport);
      }
      done({
        success: true,
        content: {
          id: transport.id,
          iceParameters: transport.iceParameters,
          iceCandidates: transport.iceCandidates,
          dtlsParameters: transport.dtlsParameters,
        },
      });
    } catch (err: any) {
      console.error("Error creating transport:", err);
      done({ success: false, error: err.message });
    }
  },
);

-> 이때 클라이언트에선 Transport 정보를 수신해 localSendTransport를 만들고 서버에 broadcast 인스턴스엔 remoteSendTransport가 등록된다.

  1. 방송인이 어떤 화면을 송출할 것인지 선택한다. (화면을 선택하는 버튼을 누르면 getLocalStream을 호출합니다.
// 클라이언트 측
const getLocalStream = async () => {
  try {
    const stream = await navigator.mediaDevices.getDisplayMedia({
      audio: true,
      video: {
        width: 1280,
        height: 720,
      },
    });
    await _onStreamSuccess(stream);
  } catch (err: any) {
    console.error(err);
  }
};

const _onStreamSuccess = async (stream: MediaStream) => {
  assert(videoRef.current, "Video ref is not defined");
  videoRef.current.srcObject = stream;

  const audioTrack = stream.getAudioTracks()[0];
  const videoTrack = stream.getVideoTracks()[0];

  if (audioTrack) {
    audioParams.current = { track: stream.getAudioTracks()[0] };
  }
  if (videoTrack) {
    videoParams.current = {
      track: stream.getVideoTracks()[0],
      ...videoOptions,
    };
  }

  await _createProducer();
};

-> 이때 audio, video 트랙을 가져와 등록해두었다.

  1. 방송인측에 Producer를 생성한다.
// 클라이언트 측
const _createProducer = async () => {
  assert(sendTransport.current, "Producer Transport is not defined");

  if (audioParams.current) {
    audioProducer.current = await sendTransport.current.produce(
      audioParams.current
    );
    audioProducer.current.on("transportclose", () => {
      console.log("Audio Producer Transport Closed");
    });
    audioProducer.current.on("trackended", () => {
      console.log("Audio Producer Track Ended");
    });
  }
  if (videoParams.current) {
    videoProducer.current = await sendTransport.current.produce(
      videoParams.current
    );
    videoProducer.current.on("transportclose", () => {
      console.log("Video Producer Transport Closed");
    });
    videoProducer.current.on("trackended", () => {
      console.log("Video Producer Track Ended");
    });
  }
};

-> 4번에서 produce 함수를 호출했을 때, Transport에서 등록해 두었던 connectproduce 이벤트가 실행된다.

// 클라이언트 측
sendTransport.current.on(
  "connect",
  async ({ dtlsParameters }, callback, errback) => {
    try {
      socket.emit("sfu:send-transport-connect", {
        channelId: user.channel.id,
        dtlsParameters,
      });
      callback();
    } catch (err: any) {
      errback(err);
    }
  }
);

sendTransport.current.on(
  "produce",
  async (parameters, callback, errback) => {
    try {
      socket.emit(
        "sfu:transport-produce",
        {
          channelId: user.channel.id,
          producerOptions: {
            kind: parameters.kind,
            rtpParameters: parameters.rtpParameters,
          },
        },
        (res: SocketResponse) => {
          if (res.success === false) {
            throw new Error(res.message);
          }
          callback({ id: res.content.id });
        }
      );
    } catch (err: any) {
      errback(err);
    }
  }
 );
  1. 서버는 connect, produce 이벤트에서 발생된 Socket 이벤트로 연결 및 서버 측의 Producer를 생성한다.
// 서버 측
socket.on(
  "sfu:send-transport-connect",
   async ({
     channelId,
     dtlsParameters,
   }: {
     channelId: string;
     dtlsParameters: mediasoup.types.DtlsParameters;
   }) => {
    console.log("DTLS PARAMS...", { dtlsParameters });
    const broadcast = this.broadcastService.getBroadcast(channelId);
    const sendTransport = broadcast.getSendTransport();
    sendTransport.connect({ dtlsParameters });
  },
);

socket.on(
  "sfu:transport-produce",
  async (
    {
      channelId,
      producerOptions,
    }: {
    channelId: string;
    producerOptions: mediasoup.types.ProducerOptions;
    },
    done,
  ) => {
    try {
      const broadcast = this.broadcastService.getBroadcast(channelId);

      const { kind, rtpParameters } = producerOptions;
      const sendTransport = broadcast.getSendTransport();
      const producer = await sendTransport.produce({
        kind,
        rtpParameters,
      });

      console.log(`producer ID: ${producer.id}, kind: ${producer.kind}`);

      producer.on("transportclose", () => {
        console.log("producer transport closed");
      });

      broadcast.addProducer(producer);

      done({ success: true, content: { id: producer.id } });
    } catch (err: any) {
      console.error("Error creating producer:", err);
      done({ success: false, error: err.message });
    }
  },
);
  1. 4번에서의 callback({ id: res.content.id }); 부분을 통해 localSendTransport에 서버 측 Producer의 ID를 전달함으로써 영상 송출이 가능해진다.

  2. 이제 시청자 측은 해당 채널에 입장했을때, 똑같이 Device를 생성한다.

// 클라이언트 측
const _createDevice = async () => {
  device.current = new mediasoup.Device();
  assert(rtpCapabilities.current, "RTP Capabilities is not defined.");
  await device.current.load({
    routerRtpCapabilities: rtpCapabilities.current,
  });
  await _createRecvTransport();
  await _getProducer();
};
  1. 시청자도 서버에 Transport 생성을 요청한다.
// 클라이언트 측
const _createRecvTransport = async () => {
  const transportOptions = await emitAsync("sfu:create-transport", {
    channelId,
    isSender: false,
  }) as mediasoup.types.TransportOptions; // RecvTransport가 생성된 이후에 Consumer를 만들어야 하기 떄문에 async를 사용했습니다.
  console.log("Transport Options: ", transportOptions);

  assert(device.current, "Device is not defined.");
  recvTransport.current =
    device.current.createRecvTransport(transportOptions);

  recvTransport.current.on(
    "connect",
    async ({ dtlsParameters }, callback, errback) => {
      try {
        socket.emit("sfu:recv-transport-connect", {
          channelId,
          dtlsParameters,
        });
        callback();
      } catch (err: any) {
        errback(err);
      }
    }
  );
};

-> 이때 서버측에서 2번에서 사용된 Socket 이벤트가 실행되어 remoteRecvTransport 또한 등록됩니다.

  1. 이후 모든 Producer를 서버에 요청해 가져온다.
// 클라이언트 측
const _getProducer = async () => {
  const { producerIds } = await emitAsync("sfu:get-producers", { channelId });
  console.log("producer IDs: ", producerIds);
  for (const producerId of producerIds) {
    await _createConsumer(producerId);
  }
};

여기서 의아해하실 수도 있는데, 현재 로직은 방송인이 먼저 스크린을 지정하지 않은 상태에서 시청자가 채널에 입장하게 되면 Producer를 받을 수 없어 작동하지 않습니다. 또한 영상 전환도 불가능합니다. 이유는 SFU 방식으로 바뀌면서 퇴장 및 방송 종료 처리 로직이 달라지게 되어서 이를 먼저 처리하기 위해 화면 공유 기능을 먼저 배우면서 제작하다보니 아직 흐름이 완전하진 못합니다. 추후에 언제든 화면 공유를 바꿀때마다 받을 수 있도록 변경할 예정입니다.

  1. 가져온 Producer ID를 이용해 서버에 Consumer를 요청한다. 이때 서버 측에선 canConsume을 통해 가능성을 체크하고 해당 옵션을 돌려준다. 로컬에선 이를 따라 consumer를 생성한다.
// 클라이언트 측
const _createConsumer = async (producerId: string) => {
  assert(device.current, "Device is not defined.");

  if (!recvTransport.current) {
    await _createRecvTransport();
  }

  assert(recvTransport.current, "Recv Transport is not defined.");
  const consumerOptions = await emitAsync("sfu:transport-consume", {
    channelId,
    producerId,
    rtpCapabilities: device.current.rtpCapabilities,
  }) as mediasoup.types.ConsumerOptions;
  console.log("consumer options: ", consumerOptions);
  const consumer = await recvTransport.current.consume(consumerOptions);
  console.log("consumer ID: ", consumer.id);

  assert(videoRef.current, "Video element is not defined.");
  const { track } = consumer;
  if (consumer.kind === "video") {
    videoRef.current.srcObject = new MediaStream([track]);
    socket.emit("sfu:consumer-resume", {
      channelId,
      consumerId: consumer.id,
    });
  } else if (consumer.kind === "audio") {
    // TODO: add audio element
  }
};
// 서버 측
socket.on(
  "sfu:transport-consume",
  async (
    {
      channelId,
      producerId,
      rtpCapabilities,
    }: {
    channelId: string;
    producerId: string;
    rtpCapabilities: mediasoup.types.RtpCapabilities;
    },
  done,
) => {
  try {
    const broadcast = this.broadcastService.getBroadcast(channelId);
    const router = broadcast.getRouter();
    const producer = broadcast.getProducer(producerId);
    const recvTransport = broadcast.getRecvTransport(socket.id);

    if (router.canConsume({ producerId: producer.id, rtpCapabilities })) {
      const consumer = await recvTransport.consume({
        producerId: producer.id,
        rtpCapabilities,
        paused: true,
      });

      consumer.on("transportclose", () => {
        console.log("consumer transport closed");
      });

      consumer.on("producerclose", () => {
        console.log("producer closed");
      });

      broadcast.addConsumer(socket.id, consumer);

      done({
        success: true,
        content: {
          id: consumer.id,
          producerId: producer.id,
          kind: consumer.kind,
          rtpParameters: consumer.rtpParameters,
        },
      });
    }
  } catch (err: any) {
    console.error(err);
    done({
      success: false,
      error: err.message,
    });
  }
},
  );
  1. 9번에서 recvTransport.consume에서 발생한 connect 이벤트에서 remoteRecvTransport와의 연결을 진행한다.
recvTransport.current.on(
  "connect",
  async ({ dtlsParameters }, callback, errback) => {
    try {
      socket.emit("sfu:recv-transport-connect", {
        channelId,
        dtlsParameters,
      });
      callback();
    } catch (err: any) {
      errback(err);
    }
  }
);
// 서버 측
socket.on(
  "sfu:recv-transport-connect",
  async ({
    channelId,
    dtlsParameters,
  }: {
         channelId: string;
         dtlsParameters: mediasoup.types.DtlsParameters;
         }) => {
  console.log("DTLS PARAMS...", { dtlsParameters });
  const broadcast = this.broadcastService.getBroadcast(channelId);
  const recvTransport = broadcast.getRecvTransport(socket.id);
  recvTransport.connect({ dtlsParameters });
},
  );
  1. 9번에서 sfu:consumer-resume Socket 이벤트를 발생시키면서 영상이 재생된다.
// 서버 측
socket.on(
  "sfu:consumer-resume",
  async ({ channelId, consumerId }: { channelId: string, consumerId: string }) => {
    console.log(
      `[consumer-resume] channelId: ${channelId}, socketId: ${socket.id}`,
    );
    const broadcast = this.broadcastService.getBroadcast(channelId);
    const consumer = broadcast.getConsumer(socket.id, consumerId);
    await consumer.resume();
  },
);

-> 이는 서버에서 Consumer를 생성할 때, paused: true 옵션을 주었기 때문인데, RTCPeerConnection 동기화 문제와 비디오 렌더링 최적화 문제로 인하여 공식 문서에선 해당 옵션을 줄 것을 권장한다. resume 함수에 의하여 저잊가 풀리며 재생된다.

여기까지 흐름을 정리해 보았다. 자세한 내용은 위의 Github에 접속해서 흐름을 살펴보는걸 추천한다.

시연

시연 사진
Chrome과 Edge를 사용해서 두개의 클라이언트를 실행해보았다.
가운데를 잘라 오른쪽 화면을 Chrome에서 공유했고, Edge에서 클라이언트에서 잘 실행되는 것을 볼 수 있다.

참조

profile
이거이 니 정주영이고 이병철이야

0개의 댓글

관련 채용 정보