[GCP] Cloud PubSub

Hocaron·2022년 1월 29일
3
post-custom-banner

Cloud PubSub

  • 이벤트를 처리하는 서비스에서 이벤트를 생성하는 서비스를 분리하는 비동기 메시징 서비스이다.
    • 메시지들을 Queue에 넣고 차례대로 전달해주는 서비스
      -> 따라서, 응답을하는 서버가 다운되더라도 요청이 들어온 메시지들은 큐에 쌓여있기 때문에 서버가 살아나면 즉시 처리를 진행할 수 있다.
      -> 여러 대의 서버가 하나의 큐를 바라보도록 할 경우, 처리 데이터가 많아져도 서버는 자신의 처리량에 맞는 요청만 가져와서 처리할 수 있다.
  • 서버리스 환경이기 때문에 별도의 인스턴스가 필요 없다.
  • GCP와 외부의 시스템을 빠르게 통합하는데 이용할 수 있다.

그림으로 보면 이해가 쉽다.

리소스설명
주제(Topic)게시자(publisher)가 메시지를 전송하는, 이름이 지정된 리소스
구독(Subscription)특정 단일 주제(Topic)의 메시지 스트림이 구독 애플리케이션으로 전달되는 과정을 나타내는, 이름이 지정된 리소스
메시지(message)Topic에 쌓이는 데이터
게시자(Publisher)특정 주제에 대한 메시지를 만들어 메시지 서비스를 전송하는 사람
구독자(Subscriber)지정한 Subscription에 대한 Message를 받는 사람

  • 누가 Publisher가 될 수 있을까?
    • googleapis.com에 HTTPS를 요청할 수 있는 모든 애플리케이션
    • App Engine 앱
    • Coogle Compute Engine
    • 다른 타사 네트워크에서 호스팅되는 웹 서비스
    • 데스크톱
    • 휴대기기에 설치된 App
    • 브라우저

Publisher의 메세지가 Subscriber에게 어떻게 가는지 알아보자.


1. Publisher애플리케이션은 Pub/Sub서비스에서 Topic을 생성하고, MessageTopic으로 전달한다.
2. Message는 Cloud Pub/Sub의 Message Storge에 Subscriber가 확인할 때까지 보존된다.
3. Cloud Pub/Sub은 MessageTopic에서 개별 Subscription으로 전달한다.
각각 구독은 구독자가 선택한 곳으로 push /pull된 메시지를 전송한다.
4. Subscriber가 지정한 endpoint로 Pub/Sub이 내보낸 메시지를 Subscriber가 받는다.
5. Subscriber는 수신된 각 Message에 대해서 acknowledgement를 Pub/Sub으로 보낸다.
6. Pub/Sub서비스는 Subscription의 메시지 Queue에서 Message를 삭제한다.

Publisher와 Subscriber의 다양한 관계

  • 다음과 같이 통신은 일대다(fan-out), 다대일(fan-in), 다대다 형태를 취할 수 있다.

Pull/Push 구독 방법

Pull(가져오기)


가져오기 전달에서는 구독자 애플리케이션이 Pub/Sub 서버에 메시지 검색을 요청한다.

  1. Subscriber애플리케이션은 Message 달라고 요청한다.
  2. Pub/Sub 서버가 Message와 확인 ID를 보낸다.
  3. Subscriber는 ACK을 전달하고, Pub/Sub서비스는 Subscription의 메시지 Queue에서 Message를 삭제한다.

Push(내보내기)


1. Pub/Sub 서버는 각 메시지를 미리 구성된 엔드포인트에 있는 Subscriber애플리케이션에 메시가 왔다는 데이터를 전달한다.
2. Subscriber는 ACK을 전달하고, Pub/Sub서비스는 Subscription의 메시지 Queue에서 Message를 삭제한다.

각각의 방법에는 특징이 있으니, 서비스에 맞는 방법을 선택하자.

Pull(가져오기)Push(내보내기)
대량 메시지(초당 1개보다 훨씬 많음)여러 주제를 같은 webhook으로 처리해야 함
메시지 처리의 효율성과 처리량이 대단히 중요함App Engine 표준 및 Cloud Functions 구독자
자체 서명되지 않은 SSL 인증서가 있는 공개 HTTPS 엔드포인트를 설정하기 어려움Google Cloud 종속 항목(사용자 인증 정보와 클라이언트 라이브러리 등)을 설정하기 어려운 환경
구독자 클라이언트가 전달 속도를 조절함. 구독자는 확인 기한을 동적으로 수정하며, 따라서 메시지 처리에 걸리는 시간이 임의로 길어질 수 있음Pub/Sub 서버가 자동으로 흐름 제어를 구현함. 클라이언트 측에서 메시지 흐름을 처리할 필요는 없지만, HTTP 오류를 되돌려보내 클라이언트가 현재 메시지 부하를 처리할 수 없음을 표시할 수는 있음
일괄 전달과 확인 및 대량의 동시 소비가 가능해 낮은 CPU와 대역폭에서도 높은 처리량을 구현함. 메시지 전달 시간 최소화를 위해 적극적인 폴링을 사용하면 효율성이 떨어질 수 있음요청당 메시지 1개를 전달하며 대기 메시지 최대 숫자를 제한함

😱나는 Pull 방법을 선택했다. 일단 Push로 구현하기 위해서는 사용자 도메인 인증을 받아야 했는데

  • 푸시 엔드포인트는 공개적으로 액세스할 수 있는 HTTPS 주소여야 한다.
  • 푸시 엔드포인트의 서버에는 인증 기관에서 서명된 유효한 SSL 인증서가 있어야 한다.

도메인을 구매하지 못한 나는 우선 Pull 방법을 사용하기로 했다. 사용자 도메인 인증도 절차가 엄청 간단하지는 않은 것 같다. 하지만, 서버 리소스를 덜 사용할 것 같으니 다음에 사용해보기로 하자.

사용자의 메일함에 메일이 올 때마다 알림을 받고 싶어요!

😎우리 서비스는 사용자 메일함에 메일이 올 때마다, 메일 개수를 확인하고 사용자가 설정한 메일 개수와 같으면 사용자에게 메일을 삭제해보시겠어요?라고 알림을 보내주는 기능이 있다.

Pub/Sub 서버에게 사용자 이메일 알림 받을래요 요청 보내는 부분

API버전

POST "https://www.googleapis.com/gmail/v1/users/me/watch"
Content-type: application/json

{
  topicName: "projects/myproject/topics/mytopic",
  labelIds: ["INBOX"],
}

라이브러리 버전

import {gmail} from '@googleapis/gmail';

... 

    const googleMail = gmail({
      version: 'v1',
      auth: oAuth2Client, // google api를 통해 인증을 해줘야한다.
    });

    const labels = ['INBOX', 'SPAM']; // 내가 푸시 알림 받고 싶은 메일함 종류
	
	await googleMail.users.watch({
		userId: user.gmail,
        requestBody: {
          labelIds: labels,
          labelFilterAction: 'include', 
          // include이면 labelIds에 포함되어 있는 메일함에 메일이 오면 메세지 받고 싶어요
          // exclude이면 labelIds에 포함되어 있는 메일함 제외한 메일함에 메일이 오면 메시지 받고 싶어요
          topicName: '구독할 주제이름',
        },
      });

Pull(가져오기)로 Pub/Sub 서버로 부터 메시지 받기

const {PubSub} = require('@google-cloud/pubsub');

const pubSubClient = new PubSub();

function listenForMessages() {
  const subscription = pubSubClient.subscription(subscriptionName);

  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

listenForMessages();

이렇게 하면 Pub/Sub 서버가 전해주는 메시지

  • 내가 하나의 메일함에 여러번 watch api를 보내도 메시지는 하나만 온다.
  • 여러번 watch api를 보내고 구독 취소 (watch 취소)하는 api 하나만 보내도, 더 이상 메세지는 오지 않는다. -> 헷갈리지 않아서 좋다.

추가로, Pub/Sub 서버에게 사용자 이메일 알림 더이상 안 받을래요 요청 보내는 부분

API버전

POST https://gmail.googleapis.com/gmail/v1/users/{userId}/stop
Content-type: application/json

{
  userId: "사용자 이메일",
}

라이브러리 버전

import {gmail} from '@googleapis/gmail';

... 

    const googleMail = gmail({
      version: 'v1',
      auth: oAuth2Client, // google api를 통해 인증을 해줘야한다.
    });

	await googleMail.users.stop({
		userId: user.gmail,
	});

해당 api들을 요청하기 위해서 필요한 OAuth scope

  • 하나만 충족해도 요청이 잘 된다.
https://mail.google.com/
https://www.googleapis.com/auth/gmail.modify
https://www.googleapis.com/auth/gmail.readonly
https://www.googleapis.com/auth/gmail.metadata

관련 코드는 여기에

구독하고 싶어요 / 구독 그만 할래요 관련 코드

https://github.com/ah-ha-dev/ah-ha-api-server/blob/develop/src/user/user.service.ts

Pull(가져오기) 관련 코드

https://github.com/ah-ha-dev/ah-ha-api-server/blob/develop/src/push-notification/push-notification.service.ts

  • 나는 NestJS를 사용했고, OnApplicationBootstrap 훅을 사용해서 다른 요청이 없어도 Application이 시작되면, Pub/Sub 서버로 부터 메시지를 받을 수 있도록 했다.

References

profile
기록을 통한 성장을
post-custom-banner

0개의 댓글