Cloud PubSub 도입기 (Java/Spring Boot, Go/Echo)

이정진·2024년 8월 20일
0

개발

목록 보기
19/21
post-thumbnail

회사에서 Cloud PubSub을 활용해 대용량 메일/문자 발송 서비스를 개발하게 되어, 관련 내용을 허락을 받아 정리한다.

Cloud PubSub

Cloud PubSub은 메시지를 생성하는 서비스를 해당 메시지를 처리하는 서비스에서 분리하는 확장 가능한 비동기 메시징 서비스이다.

주요 특징은 아래와 같다.

  • 게시자(Publisher)구독자(Subscriber)라는 이벤트 제작자 및 소비자 시스템으로 만들어진다.
    • 게시자는 동기식 리모트 프로시져 콜(RPC)가 아닌 이벤트를 브로드 캐스트로 구독자와 비동기적으로 통신
  • 비동기적으로 100밀리초의 지연시간으로 통신할 수 있다.
  • 구독 서버가 다운된 상황에도 해당 메시지들이 큐에 쌓여 있기에, 서버 재시작 이후 순차적으로 처리할 수 있다.
  • Apache Kafka와 달리 파티션 기반 메시징이 아닌 메시지당 동시 로드 방식을 사용한다.
    • 개별 메시지를 구독자 클라이언트에 ‘임대’한 다음 지정된 메시지가 성공적으로 처리되었는지 주기적으로 확인하는 방식

Cloud PubSub 구조

  • Publisher: 게시자 또는 제작자라고 칭하며, 특정 주제에 대한 메시지를 만들어 메시징 서비스로 전송(게시)하는 역할이다.
  • Message: 서비스를 통해 이동하는 데이터
  • Topic: 주제라고 칭하며, 메시지 피드를 나타내는, 이름이 지정된 항목이다.
  • Schema: Pub/Sub 메시지의 데이터 형식을 제어하는 이름이 지정된 항목이다.
  • Subscription: 구독이라고 칭하며, 특정 주제의 메시지 수신 의향을 나타내는, 이름이 지정된 항목이다.
  • Subscriber: 구독자 또는 소비자라고 칭하며, 지정한 구독에 대한 메시지를 수신한다.

Cloud PubSub 워크 플로우

  1. Publisher가 PubSub Topic에 Message를 전송한다.
  2. Message가 Storage에 기록된다.
  3. Storage에 Message를 기록하는 것과 함께 PubSub이 해당 Topic의 모든 연결된 Subscription에 Message를 전달한다. (구독은 단일 구독일 수도, 다중 구독일 수도 있다.)
  4. Subscription이 Message를 연결된 Subsriber에게 전송한다.
  5. Subscriber가 Message 처리 확인 여부를 PubSub으로 전송한다.
  6. 각 Subscription에 대해 하나 이상의 Subscriber가 Message를 확인하면 PubSub이 Message를 Storage에서 삭제한다.

‘가상 면접 사례로 배우는 대규모 시스텀 설계 기초’ 책에서 PubSub 구조를 읽은 적이 있는데, 해당 책에서 칭하는 소비자 그룹이 여기서의 Subscription이며, 소비자 그룹 내 각각의 소비자가 Subscriber로 이해하면 될 것이다.

PubSub 게시 및 구독 패턴

  • Many-to-one(다대일/팬인)
    • 여러 게시자 애플리케이션이 단일 주제로 메시지를 게시
  • Many-to-many(다대다/부하 분산)
    • 단일 또는 여러 게시자 애플리케이션이 단일 주제로 메시지를 게시하고, 단일 구독에 연결되어 있어, 여러 개의 구독자 애플리케이션에 연결
    • 여러 구독자를 사용해서 규모에 맞게 메시지를 처리하는 방식에 적합
  • One-to-many(일대다/팬아웃)
    • 단일 또는 여러 게시자 애플리케이션이 단일 주제로 메시지 게시, 여러 구독에 연결
    • 동일한 메시지 집합에서 여러 다른 데이터 작업을 수행하는 경우 적합

대용량 메일/문자 발송 서비스를 구축하게 되었으므로, 여기서는 부하 분산 패턴을 사용할 것이다.

구독 개요 (Pull/Push/내보내기)

(출처: tachingchen 블로그 / 2024-06-24)

구독 유형

  • Pull: 구독자 클라이언트를 사용하여 PubSub 서버에서 메시지를 요청
  • Push: PubSub 서버를 사용하여 구독자 애플리케이션에 메시지 전송을 요청
  • 내보내기 구독: 메시지를 Google Cloud 리소스로 직접 내보낼 수 있다.

내보내기는 GCP 문서에는 존재하지만, Google Cloud 리소스로 직접 내보내는 경우에 대해서만 지원하는 것으로 보이며, 관련한 자세한 레퍼런스를 아직 찾지 못했다.

구독 유형별 비교

기능Pull(가져오기)Push내보내기
사용 사례- 대량 메세지
- 메시지 처리의 효율성과 처리량이 대단히 중요
- 자체 서명되지 않은 SSL 인증서가 있는 공개 HTTPS 엔드포인트를 설정할 수 없는 환경
- 여러 주제를 같은 웹훅으로 처리해야 함
- App Engine 표준 및 Cloud Functions 구독자
- Google Cloud 종속 항목(사용자 인증 정보와 클라이언트 라이브러리)을 설정하기 어려운 환경
- 메시지가 초당 수백만 개까지 확장될 수 있는 대량 메시지
- 메시지가 추가 처리 없이 Google Cloud 리소스에 직접 전송됨
엔드포인트사용자 인증 정보를 증명한 인터넷상의 모든 기기는 Pub/Sub API를 호출할 수 있음- 공개 웹에 액세스 가능한, 자체 서명되지 않은 인증서가 있는 HTTPS 서버
- 수신 엔드포인트는 PubSub 구독과 분리될 수 있으며, 따라서 여러 구독의 메시지가 단일 엔드포인트에 전송됨
- BigQuery 구독의 BigQuery 데이터 세트 및 테이블
- Cloud Storage 구독의 Cloud Storage 버킷
부하 분산-여러 구독자가 같은 '공유' 구독에 가져오기 호출을 보낼 수 있음
- 각 구독자는 메시지 하위 집합을 수신함
push 엔드포인트가 부하 분산기가 될 수 있음PubSub 서비스가 자동으로 부하를 분산
구성구성할 필요가 없음- 구독자와 같은 프로젝트에 있는 App Engine 앱은 구성할 필요가 없음
- 푸시 엔드포인트 확인은 Google Cloud 콘솔에서 필요하지 않음
-엔드포인트는 DNS 이름을 통해 연결할 수 있으며, SSL 인증서가 설치되어 있어야 함
- 적절한 권한으로 구성된 BigQuery 구독에 대해 BigQuery 데이터 세트와 테이블이 있어야 함
- 적절한 권한으로 구성된 Cloud Storage 구독에 대한 Cloud Storage 버킷이 있어야 함
흐름 제어구독자 클아이언트가 전달 속도를 조절함. 구독자는 확인 기한을 동적으로 수정하며, 따라서 메시지 처리에 걸리는 시간이 임의로 길어질 수 있음PubSub 서버가 자동으로 흐름 제어를 구현함. 클라이언트 측에서 메시지 흐름을 처리할 필요는 없지만 HTTP 오류를 되돌려 보내 클아이언트가 현재 메시지 부하를 처리할 수 없음을 표시할 수는 있음PubSub 서버가 Google Cloud 리소스에 대한 메시지 쓰기를 최적화하기 위해 자동으로 흐름 제어를 구현
효율성 및 처리량일괄 전송, 확인, 대량의 동시 소비가 가능해 낮은 CPU와 대역폭에서도 높은 처리량을 구현함. 메시지 전송 시간 최소화를 위해 적극적인 폴링을 사용하면 효율성이 떨어질 수 있음요청당 메시지 1개를 전달하며 대기 메시지 최대 숫자를 제한함확장성은 PubSub 서버에 의해 동적으로 처리됨

Cloud PubSub + Spring Boot/Echo

기존 대용량 메일 발송 서비스는 Monolithic으로 개발한 상황이다. 나는 Cloud PubSub의 도입을 주장하였는데, 근거는 아래와 같다.

  • 기존 Monolithic으로 구현된 서비스는 확장성에서 불리하다.
    • 대용량 메일 발송 기능만 구현되어 있는데, 문자 발송 기능이 추가 예정이기 때문이다.
  • 동기 형식으로 개발되어 있는 API
    • Elixir로 개발을 진행했는데, API 요청으로 Connection이 30초를 넘어가는 순간 Stream Timeout 문제가 발생하기에 비동기로 전환할 필요성이 있다.
  • 회사 인프라는 GCP를 클라우드로 사용한다.
    • 회사가 GCP를 사용하고 있기에, AWS SES 등을 추가적으로 도입해 활용하는 것보다 Cloud PubSub을 활용해 비동기로 API 서버와 발송 서버를 분리하는 것이 효과적일 것이다.

다양한 의견 교류 과정을 거쳐, 최종적으로 Spring Boot를 API 서버로, Echo를 발송 서버로 활용하기로 결정했다.

Spring Boot를 API 서버로 선택한 이유

  • Spring Security 기반으로 인증/인가 로직 개발 용이
  • 다양한 레퍼런스와 공식 문서 지원
  • 프로젝트 진행 경험이 있는 언어 및 프레임워크로 적은 러닝 커브, 빠른 개발 속도로 기개발된 API 마이그레이션 가능
  • Elixir와 달리 국내 개발자 풀이 넓어, 새로운 인력이 들어왔을 때 코드 이해도 높아질 것이라는 예상

Echo를 발송 서버로 선택한 이유

  • 높은 동시성과 빠른 실행 속도
  • 상대적으로 간단한 속도로 높은 성능과 확장성을 가지는 Echo 프레임워크
  • 적은 메모리로 대량의 요청 처리 가능
  • (Optional) 더 높은 동시성 효율을 위해 고루틴 사용 가능

Pull vs Push

먼저 내보내기는 BigQuery과 Cloud Storage 서비스를 지원하기에, 사용할 수 없어서 제외했다.

이 서비스에 대한 Push 모델과 Pull 모델의 장/단점을 아래의 표로 정리할 수 있다.

장점단점
Push- 실시간 처리 가능(메시지 발생 즉시 전달)
- 서버는 엔드포인트만 제공하면 바로 처리가 가능하므로 개발 용이
- 구독 서버가 항상 메시지를 받을 준비를 하고 있어야 함
- 트래픽의 증가에 따른 부하 관리가 어려움
- 대용량 메일 요청이 왔을 때 Stream Timeout에 대한 관리 필요
Pull- 구독자가 메시지를 가져오는 시기와 양을 조절 가능
- 트래픽이 몰릴 경우, 구독자를 새로 추가 및 제거하는 방식으로 관리 가능
- 구독자가 일시적으로 중단되어도 손실을 최소화할 수 있음
- 실시간성이 Push 모델에 비해 떨어짐
- 구독자 측에서 별도의 풀링 로직을 개발해야 함

최종적으로는 Push모델을 선택했다. Push모델을 선택한 이유는 아래와 같다.

  • 실시간 처리 가능
  • 발송 서버의 개발 용이성
  • PubSub 내 재시도 로직 존재 (구독 서버가 메시지 받을 준비가 안 되어 있다, 정상화되면 바로 처리 가능)
  • 대용량 요청 처리 시, 고루틴이라는 해결책 존재

Cloud PubSub 설정

로컬에서 Cloud PubSub 설정

GCP는 정말 친절하게도 Cloud PubSub Emulator를 Docker 환경에서 활용할 수 있도록 제공하고 있다.

Docker Image Pull

먼저 Docker Image를 아래의 커맨드를 활용하여 가져온다.

docker pull google/cloud-sdk:emulators

Docker Run

가져온 이미지로 컨테이너를 실행한다.

docker run --rm -p 8085:8085 google/cloud-sdk:emulators /bin/bash -c "gcloud beta emulators pubsub start --project=some-project-id --host-port='0.0.0.0:8085'"
  • 일반적으로 8085 포트를 사용하기에, 8085를 포트번호로 지정했다.
  • google/cloud-sdk:emulators 이미지에서 PubSub Emulator를 명령어로 실행시키는 방식이다.
  • 여기서 사용되는 project id는 로컬에서만 사용하는 project id이다. 나는 study-pubsub으로 진행했다.
  • 백그라운드 실행을 위해서는 -d 옵션을 활용해야 한다.

실행시키면, 위 이미지와 같이 PubSub의 테스트용이라고 뜨면서 실행된다.

이를 활용해서, 비용 발생 없이 테스트해볼 수 있을 것이다.

나는 CLI에서 PubSub 제어하는 것이 너무 고통스러워, 결국 GCP에 직접 띄워서 진행했다.

GCP에서 Cloud PubSub 설정

Cloud PubSub 주제 설정

  1. 주제 만들기 클릭

  2. 주제 ID 설정

    발송 서버의 예제이므로, 주제 ID를 send-test로 진행했다.
    페이지 내에 보면, 기본 구독 추가 ~ Cloud Storage에 메시지 데이터 백업 등 다양한 선택지가 존재한다. 정리하면 아래와 같다.

    • 기본 구독 추가: 구독 설정이 기본 구독값으로 구성됩니다. 기본값은 아래와 같습니다.
      1. pull 전송 유형
      2. 메시지 보관 기간 7일
      3. 31일 동안 활동이 없으면 만료됨
      4. 확인 기한 10초
      5. 즉시 재시도 정책
    • 스키마 사용: 기존 스키마를 가져와 할당하거나 새로운 스키마를 만들어 적용할 수 있다. (스키마 = 주제의 메시지가 따라야 하는 형식)
    • 메시지 보관 사용: 발송된 메시지의 히스토리를 최대 31일동안 보관 및 확인할 수 있다는 의미이다. 유료임을 유의해야 한다.
    • BigQuery로 메시지 데이터 내보내기: Publish된다면 BigQuery테이블로 메세지 데이터가 직접 전송된다.
    • Cloud Storage에 메시지 데이터 백업: Publish된다면 Cloud Storage의 버킷으로 메세지 데이터가 직접 전송된다.

여기서 Push 모델을 사용할 예정이므로, 기본 구독 추가 옵션을 선택한 이후, 생성된 주제 안에서 Push 모델로 설정을 변경할 것이다.

  1. Push모델 변경
    구독 수정에 접속한다면, 전송 유형을 선택할 수 있다. 여기서, 푸시를 선택하면 된다.

    유의사항: Push모델은 SSL/TLS 보안 인증이 되어 있는 엔드포인트에 한해서 가능하다. 그렇기에, 로컬에서 경험하고자 개발하고 있다면, 아래의 개발 과정에서 설명하고 있는 ngrok을 활용하여 https 도메인을 받아서 푸시 엔드포인트를 업데이트하면 된다.

  2. 재시도 설정
    구독하고 있는 소비자에게 메세지를 전달하였지만, 실패된 경우 Cloud PubSub은 재시도를 지원한다. 이 때, 즉시 재시도와 지수 백오푸 지연 후 재시도 등의 옵션이 있으니 상황에 맞추어 설정하면 된다. 여기서는 기본값인 즉시 재시도를 유지했다.

개발

Message 구조

Cloud PubSub에서 메시지 구조는 일반적으로 아래와 같다.
(출처: PubSub REST API Message Docs)

{
	"message": {
      "data": string,
      "attributes": {
        string: string,
        ...
      },
      "messageId": string,
      "publishTime": string,
      "orderingKey": string
    },
    "subscription": "topic/subsription name"
}

여기서, publishTime, messageId, subscription를 제외하고는 모두 선택사항이다.

나는 여기서, data/attributes 이 두 필드를 활용해 필요한 데이터를 전달 및 검증을 진행할 예정이다.
1. data 필드는 실제로 필요한 데이터들을 담고 있다.
발송을 처리하는 로직에 대한 예제이므로, 단편적으로 이메일 제목/본문/수신자 정보만을 공유하도록 처리했다.

{
  "title": "이메일 제목",
  "body": "이메일 본문",
  "receiver": "수신자"
}

2. attributes 필드에는 서버 간 별도 지정한 인증키를 저장하여, 유효성 검증을 진행한다.
유효성 검증은 subscription을 통해서, 지정된 Topic에 대한 구독인지를 검증하는 것이 1차이며, attributes 필드의 인증키 검증을 통해 2차 인증을 진행한다. 2차 인증을 도입한 이유는 혹시 모를 Topic 노출 사태에 대비하는 차원과 보안 수준은 높으면 높을수록 좋다고 생각하기 때문이다.

주의사항: data 필드는 key-value 형태가 아니다. Base64로 인코딩된 문자열이여야 하므로, 발행과 구독 파트 모두 인코딩/디코딩 로직을 포함해야 한다.

Go/Echo 개발

먼저 구독 파트를 개발한다.
구독 파트를 먼저 개발하는 이유는 Push모델 특성상 SSL/TLS 인증이 필수적이기 때문이다.
로컬에 띄운 서버의 엔드포인트로 SSL/TLS 엔드포인트를 부여하고자 한다면, ngrok을 사용해서 엔드포인트를 발급받은 후, 해당 엔드포인트를 Cloud PubSub에 등록하는 과정을 진행해야 한다. ngrok에 대해 궁금하신 분들은, 과거에 작성한 ngrok 글을 읽어보시면 좋을 것 같다.

API 개발

handler

func (eh *EmailHandler) SendEmail(c echo.Context) error {
	// Request Data Binding
	var pubsubMessage dto.PubSubMessage

	// Request Body와 Request DTO 매핑, 오류 발생 시 401 반환
	err := c.Bind(&pubsubMessage)
	if err != nil {
		return err
	}

	fmt.Println(pubsubMessage)

	// Service Method
	ctx := c.Request().Context()
	serviceErr := eh.EmailService.SendEmail(ctx, pubsubMessage)
	if serviceErr != nil {
		return serviceErr
	}

	return c.JSON(http.StatusOK, common.NewSuccessResponseWithoutData())
}

service

func (es *EmailService) SendEmail(ctx context.Context, pubsubMessage dto.PubSubMessage) error {
	// Transaction Start
	tx := es.EmailRepository.DB.Begin()
	defer func() {
		if err := recover(); err != nil {
			tx.Rollback()
		}
	}()

	// Validation - PubSub Topic Check
	if pubsubMessage.Subscription != config.GetEnvVar("GCP_PUBSUB_TOPIC") {
		tx.Rollback()
		return &common.InvalidSubscriptionError
	}

	// Validation - PubSub Authentication Key Check
	if pubsubMessage.Message.Attributes["authentication"] != config.GetEnvVar("AUTHENTICATION_KEY") {
		tx.Rollback()
		return &common.InvalidAuthenticationKeyError
	}

	// Validation - Cloud PubSub Message Decoding & Binding
	decodedData, decodeErr := pubsubMessage.DecodedData()
	if decodeErr != nil {
		tx.Rollback()
		return decodeErr
	}

	var emailRequest dto.EmailRequest
	if err := json.Unmarshal(decodedData, &emailRequest); err != nil {
		tx.Rollback()
		return err
	}
	email, findErr := es.EmailRepository.FindByID(ctx, tx, emailRequest.ID)
	if findErr != nil {
		tx.Rollback()
		return findErr
	}

	// Business Logic
	// ThirdParty Email Send - Random Response
	emailErr := randomResponse()

	// 발송 상태 업데이트
	if emailErr != nil {
		email.Status = model.FAILED
		es.EmailRepository.Update(ctx, tx, email)
	} else {
		email.Status = model.Sent
		es.EmailRepository.Update(ctx, tx, email)
	}

	// Transaction End
	if err := tx.Commit().Error; err != nil {
		tx.Rollback()
		return err
	}

	return nil
}

func randomResponse() error {
	// 시드 초기화
	rand.Seed(time.Now().UnixNano())

	// 0 또는 1 랜덤 생성
	randomBit := rand.Intn(2) // 0과 1 중 하나를 랜덤으로 반환

	// 0이면 에러 반환
	if randomBit == 0 {
		return &common.EmailError
	} else {
		return nil
	}
}

repository

func (er *EmailRepository) FindByID(ctx context.Context, tx *gorm.DB, emailId uint) (*model.Email, error) {
	var email model.Email
	if err := tx.WithContext(ctx).Where("id = ?", emailId).First(&email).Error; err != nil {
		return nil, err
	}
	return &email, nil
}

func (er *EmailRepository) Update(ctx context.Context, tx *gorm.DB, email *model.Email) error {
	if err := tx.WithContext(ctx).Save(email).Error; err != nil {
		return err
	}
	return nil
}

Java/Spring Boot 개발

구독 파트는 개발을 완료했으니, 이제는 발행 파트이다.

기본적인 프로젝트 환경은 생략한다. PubSub에 메세지를 발행하는 PubSubClient와 비즈니스 로직 코드를 공유한다.

주요 특징으로는, PubSub Dependency 내에 인코딩 로직이 포함되어 있다. 그렇기에, 메세지 발행하는 PubSubClient에서는 ObjectMapper를 활용해 Json형태로만 만들어주었다.

Base64 인코딩 파트를 찾기 위해, 라이브러리에서 Publish 메소드까지 따라 들어가보았지만, 찾지 못했다. 내부적으로 gRPC를 기반으로 통신하는데, 해당 위치에서 인코딩 로직이 있는 것으로 보인다.

build.gradle

// Pubsub
implementation group: 'com.google.cloud', name: 'spring-cloud-gcp-starter-pubsub', version: '5.4.1'

Maven Repository에서 Cloud PubSub에 활용하는 의존성을 확인할 수 있다.

PubSubClient

spring:
	cloud:
      gcp:
        credentials:
          location: {GCP 서비스 계정 경로}
        project-id: {GCP 프로젝트 ID}
        pubsub:
          topic: {Cloud PubSub Topic}
          authentication-key: {서버 간 공유 인증키}
@Component
@RequiredArgsConstructor
public class PubSubClient {

    private final PubSubTemplate pubSubTemplate;
    private final ObjectMapper objectMapper;

    @Value("${spring.cloud.gcp.pubsub.topic}")
    private String topicName;

    @Value("${spring.cloud.gcp.pubsub.authentication-key}")
    private String authenticationKey;

    public void publishMessage(MessageContent messageContent) {
        try {
            // Convert to Json
            String message = objectMapper.writeValueAsString(messageContent);

            // Generate PubsubMessage Object with attributes
            PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                    .setData(com.google.protobuf.ByteString.copyFromUtf8(message))
                    .putAttributes("authentication", authenticationKey)
                    .build();

            // Publish message
            pubSubTemplate.publish(topicName, pubsubMessage);
        } catch (JsonProcessingException jsonProcessingException) {
            throw new RuntimeException(jsonProcessingException.getMessage());
        }
    }
}

비즈니스 로직

@Service
@RequiredArgsConstructor
public class EmailService {

    private final EmailRepository emailRepository;
    private final PubSubClient pubSubClient;

    public void sendEmail(EmailReq emailReq) {
        Email email = emailReq.from();
        Email savedEmail = emailRepository.save(email);
        MessageContent messageContent = new MessageContent(
                savedEmail.getId(),
                savedEmail.getTitle(),
                savedEmail.getBody(),
                savedEmail.getReceiver()
        );
        pubSubClient.publishMessage(messageContent);
    }
}

개발한 이후, 테스트해보면 아래와 같이 게시 시도 카운트가 올라가는 것을 GCP 대시보드에서 확인할 수 있다.

발송 서버에서는 0/1을 랜덤으로 뽑아 성공/실패를 결정하도록 하여, 이메일 서드파티에서 문제상황이 발생하는 것을 가정해서 진행해보았는데, 실제로 FAILEDSENT가 정상적으로 분기되는 것을 확인할 수 있다.

Cloud PubSub을 연결하여 사용하는 과정에 대한 예시 코드는 Github Repo에서 확인할 수 있다.


후기
회사에서 도입했던 Cloud PubSub을 도입 근거부터 도입 예제까지 정리하면서, 다른 다양한 인프라들도 적용해보고 싶다는 생각이 많이 들었다. 오버 엔지니어링을 경계면서, 서비스 개선을 위한 다양한 인프라를 앞으로 학습하고 적용해볼 예정이다.


레퍼런스

0개의 댓글