[GCP] Google Cloud Pub/Sub 서비스의 핵심 개념과 실습 튜토리얼 (UI / Python)

NewNewDaddy·2025년 8월 20일
0

GCP

목록 보기
8/9
post-thumbnail

🔹 0. INTRO

  • 현대 소프트웨어 아키텍처에서 시스템 간의 효율적인 통신은 필수적입니다. 특히 마이크로서비스 환경에서는 각 서비스가 독립적으로 동작하면서도 서로 간의 데이터를 주고받아야 하는 상황이 자주 발생합니다. 이 때 사용할 수 있는 개념으로 Queue 라는 자료구조가 있습니다.
    Queue를 통해 우리는 시스템 간의 느슨한 결합 가지는 아키텍처를 구성할 수 있습니다.
    우리에게 익숙한 AWS 클라우드에서 가장 먼저 런칭된 서비스가 AWS SQS라고 하는 비동기 메세징 서비스였습니다. GCP에도 이와 동일한 서비스가 있는데 바로 'Pub/Sub'이라는 서비스입니다.
    이번 글에서는 Google Cloud Pub/Sub 서비스의 개념에 대해 알아보고 python을 활용하여 어떻게 다룰 수 있는지 실습해보도록 하겠습니다.

🔹 1. Google Cloud Pub/Sub이란?

  • Google Cloud Pub/Sub은 완전 관리형의 비동기 메시징 서비스로, 발행자(Publisher)와 구독자(Subscriber) 간의 메시지 전달을 할 수 있습니다. 전통적인 동기식 통신 방식과 달리, 메시지를 보내는 쪽과 받는 쪽이 서로를 직접 기다리지 않고도 데이터를 교환할 수 있게 하여 서비스간의 느슨한 결합을 가능하게 해줍니다.

▪ 핵심 개념 설명

토픽(Topic)

  • 메세지가 저장되는 저장소로, 발행자가 보낸 메세지를 임시로 보관하는 역할을 합니다.
  • 저장되는 메세지의 종류에 따라 여러 토픽 생성이 가능하며, GCP 프로젝트 내에서는 고유한 이름으로 식별되어야 합니다.

발행자(Publisher)

  • 데이터를 생성하고, 이 생성된 데이터를 특정 토픽에 전송하는 역할을 하는 컴포넌트입니다.

구독자(Subscriber)

  • 특정 토픽을 구독하여 발행자가 보낸 메세지를 수신하고 처리하는 역할을 담당하는 컴포넌트입니다.
  • 단순히 메세지를 읽어올 수도 있고, 읽어온 메세지를 BigQuery나 GCS에 저장하도록 설정할 수도 있습니다.

발행자 - 토픽 - 구독자의 관계

🔹 2. Pub/Sub 실습 - UI

▪ 1) 토픽 생성

  • 가장 먼저 메세지를 발행하여 저장할 토픽을 생성해줍니다. Pub/Sub → 주제 → +주제 만들기 탭에 들어가서 토픽의 이름만 설정해주면 바로 생성이 가능합니다.

▪ 2) 구독 생성

  • 토픽을 생성하였다면 해당 토픽으로 들어오는 메세지를 받아서 소비할 구독을 생성해야 합니다.
  • Pub/Sub → 구독 → +구독 만들기 탭에서 구독 생성이 가능합니다. 구독의 유형으로는 아래 4가지를 선택할 수 있습니다.
    • 가져오기 : 메세지를 읽어오기
    • 푸시 : 메세지를 다른 Endpoint URL로 전송
    • BigQuery에 쓰기 : 메세지를 BigQuery 테이블에 저장
    • Cloud Storage에 쓰기 : 메세지를 GCS 파일 객체로 저장(TEXT or AVRO 포맷)
  • 구독 ID 와 메세지를 읽어올 토픽을 선택하고, 가장 기본이 되는 가져오기 유형으로 구독을 생성합니다.

▪ 3) 메세지 발행

  • 위에서 생성한 토픽에서 테스트 메세지 생성이 가능합니다. 아래와 같이 토픽에 들어가서 메세지 → 메세지 게시 탭을 선택하고,

    메세지 본분에 간략히 내용을 적어서 게시하면 해당 토픽에서 메세지를 발행할 수 있습니다.

▪ 4) 메세지 가져오기

  • 토픽에서 발행한 메세지는 구독에서 가져올 수 있습니다. 위에서 dev_topic을 구독하는 dev_subscription에서 발행된 메세지 확인이 가능합니다.

🔹 3. Pub/Sub 실습 - Python

  • 위에서 처럼 직관적인 GUI 환경에서 설정할 수도 있지만 Google의 Pub/Sub 관련 라이브러리를 설치하면 Python 코드로도 해당 기능 구현이 가능합니다.

▪ 1) 라이브러리 설치 및 기본 설정

  • 라이브러리 설치 → pip install google-cloud-pubsub
  • 아래 코드에서 GCP와 통신할 Client 및 토픽 객체를 생성합니다.
from google.cloud import pubsub_v1
from google.oauth2 import service_account

PROJECT_ID = "[프로젝트 ID]"
KEY_PATH = "[서비스 계정 JSON KEY 경로]"
CREDENTIALS = service_account.Credentials.from_service_account_file(KEY_PATH)

TOPIC_ID = "dev_topic"
publisher = pubsub_v1.PublisherClient(credentials=CREDENTIALS)
TOPIC_PATH = publisher.topic_path(PROJECT_ID, TOPIC_ID)

print(TOPIC_PATH)


--- 출력 결과 ---
projects/codeit-hyunsoo/topics/dev_topic

▪ 2) 토픽 생성

  • 이미 생성된 토픽이라면 pass 되도록 예외처리까지 포함하여 구성합니다.
# Pub/Sub 토픽 생성 코드
from google.api_core.exceptions import AlreadyExists

try:
    topic = publisher.create_topic(name=TOPIC_PATH)
    print(f"토픽이 생성되었습니다: {topic.name}")
except AlreadyExists:
    print(f"이미 존재하는 토픽입니다: {TOPIC_PATH}")
    
    
--- 출력 결과 ---
토픽이 생성되었습니다: projects/codeit-hyunsoo/topics/dev_topic

▪ 3) 샘플 메세지 전송

  • Faker 라이브러리를 활용해 랜덤한 문장 생성 후 dev_topic으로 전송하는 발행자를 만들어줍니다. (3초에 한 번씩 총 10개의 메세지 전송)
from faker import Faker
import time

fake = Faker()

for _ in range(10):
    msg = fake.sentence()
    future = publisher.publish(
        topic=TOPIC_PATH,
        data=msg.encode("utf-8"),
        source="app1"
    )
    print(f"보낸 메시지: {msg} / 결과: {future.result()}")
    time.sleep(3)
    
    
--- 출력 결과 ---
보낸 메시지: Include business head send friend him final. / 결과: 15936265876075595
보낸 메시지: Check example left chance approach large. / 결과: 15936256995631331
보낸 메시지: Heart still property. / 결과: 15936267081923148
보낸 메시지: Can production admit with business moment future. / 결과: 15936297693105990
보낸 메시지: Reflect national government between bag part with mission. / 결과: 15936288400415598
보낸 메시지: Meeting drive anyone note. / 결과: 15936290797590549
보낸 메시지: Particularly fact see far election. / 결과: 15936291128627334
보낸 메시지: Hair energy tax whole model head hit. / 결과: 15936290594708674
보낸 메시지: Right range trade score half. / 결과: 15936291572828675
보낸 메시지: Effect where sign popular family media. / 결과: 15936285434903098

▪ 4) 구독 생성

  • 구독 역시 토픽 생성과 비슷하게 서비스계정 JSON KEY를 기반으로 Client 및 구독 객체를 생성하고, 가져오기 유형의 구독을 python으로 생성합니다.
from google.cloud import pubsub_v1
from google.oauth2 import service_account
import time
from google.api_core.exceptions import AlreadyExists

PROJECT_ID = "[프로젝트 ID]"
KEY_PATH = "[서비스 계정 JSON KEY 경로]"
CREDENTIALS = service_account.Credentials.from_service_account_file(KEY_PATH)

# 토픽 정보
TOPIC_ID = "dev_topic"
publisher = pubsub_v1.PublisherClient(credentials=CREDENTIALS)
TOPIC_PATH = publisher.topic_path(PROJECT_ID, TOPIC_ID)

# 구독 정보
SUBSCRIPTION_ID = "dev_subscription"
subscriber = pubsub_v1.SubscriberClient(credentials=CREDENTIALS)
SUBSCRIPTION_PATH = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)

try:
    subscription = subscriber.create_subscription(
        name=SUBSCRIPTION_PATH,
        topic=TOPIC_PATH
    )
    print(f"구독이 생성되었습니다: {SUBSCRIPTION_PATH}")
except AlreadyExists:
    print(f"이미 존재하는 구독입니다: {SUBSCRIPTION_PATH}")


--- 출력 결과 ---
구독이 생성되었습니다: projects/codeit-hyunsoo/subscriptions/dev_subscription

▪ 5) 토픽에 저장된 메세지 읽기

  • dev_topic에 저장된 메세지들은 구독을 통해 읽어올 수 있습니다. 아래 코드는 1초에 한 번씩 메세지를 확인하여 출력해줍니다.
def callback(message):
    print(f"받은 메시지: {message.data.decode('utf-8')}")
    message.ack()

streaming_pull_future = subscriber.subscribe(SUBSCRIPTION_PATH, callback=callback)
print(f"구독을 시작합니다: {SUBSCRIPTION_PATH}")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    streaming_pull_future.cancel()
    print("구독을 중단합니다.")
finally:
    streaming_pull_future.cancel()
    subscriber.close()
    
--- 출력 결과 ---
구독을 시작합니다: projects/codeit-hyunsoo/subscriptions/dev_subscription
받은 메시지: This is Sample Message
구독을 중단합니다.

🔹 4. OUTRO

  • 지금까지 Google Cloud Pub/Sub의 기본 개념부터 실제 구현까지 단계별로 살펴보았습니다. 토픽과 프로듀서, 컨슈머 개념 등 사용해보면서 Kafka와 상당히 비슷하다는 느낌을 받을 수 있었습니다.
  • 다음 글에서는 토픽으로 들어온 메세지를 BigQuery와 GCS에 객체로 저장하는 방법에 대해 다뤄보도록 하겠습니다.
profile
데이터 엔지니어의 작업공간 / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

0개의 댓글