🔹 0. INTRO
- 현대 소프트웨어 아키텍처에서 시스템 간의 효율적인 통신은 필수적입니다. 특히 마이크로서비스 환경에서는 각 서비스가 독립적으로 동작하면서도 서로 간의 데이터를 주고받아야 하는 상황이 자주 발생합니다. 이 때 사용할 수 있는 개념으로 Queue 라는 자료구조가 있습니다.
Queue를 통해 우리는 시스템 간의 느슨한 결합 가지는 아키텍처를 구성할 수 있습니다.
우리에게 익숙한 AWS 클라우드에서 가장 먼저 런칭된 서비스가 AWS SQS라고 하는 비동기 메세징 서비스였습니다. GCP에도 이와 동일한 서비스가 있는데 바로 'Pub/Sub'이라는 서비스입니다.
이번 글에서는 Google Cloud Pub/Sub 서비스의 개념에 대해 알아보고 python을 활용하여 어떻게 다룰 수 있는지 실습해보도록 하겠습니다.
🔹 1. Google Cloud Pub/Sub이란?
- Google Cloud Pub/Sub은 완전 관리형의 비동기 메시징 서비스로, 발행자(Publisher)와 구독자(Subscriber) 간의 메시지 전달을 할 수 있습니다. 전통적인 동기식 통신 방식과 달리, 메시지를 보내는 쪽과 받는 쪽이 서로를 직접 기다리지 않고도 데이터를 교환할 수 있게 하여 서비스간의 느슨한 결합을 가능하게 해줍니다.

▪ 핵심 개념 설명
토픽(Topic)
- 메세지가 저장되는 저장소로, 발행자가 보낸 메세지를 임시로 보관하는 역할을 합니다.
- 저장되는 메세지의 종류에 따라 여러 토픽 생성이 가능하며, GCP 프로젝트 내에서는 고유한 이름으로 식별되어야 합니다.
발행자(Publisher)
- 데이터를 생성하고, 이 생성된 데이터를 특정 토픽에 전송하는 역할을 하는 컴포넌트입니다.
구독자(Subscriber)
- 특정 토픽을 구독하여 발행자가 보낸 메세지를 수신하고 처리하는 역할을 담당하는 컴포넌트입니다.
- 단순히 메세지를 읽어올 수도 있고, 읽어온 메세지를 BigQuery나 GCS에 저장하도록 설정할 수도 있습니다.
발행자 - 토픽 - 구독자의 관계

🔹 2. Pub/Sub 실습 - UI
▪ 1) 토픽 생성
- 가장 먼저 메세지를 발행하여 저장할 토픽을 생성해줍니다.
Pub/Sub → 주제 → +주제 만들기 탭에 들어가서 토픽의 이름만 설정해주면 바로 생성이 가능합니다.

▪ 2) 구독 생성
- 토픽을 생성하였다면 해당 토픽으로 들어오는 메세지를 받아서 소비할 구독을 생성해야 합니다.
Pub/Sub → 구독 → +구독 만들기 탭에서 구독 생성이 가능합니다. 구독의 유형으로는 아래 4가지를 선택할 수 있습니다.
가져오기 : 메세지를 읽어오기
푸시 : 메세지를 다른 Endpoint URL로 전송
BigQuery에 쓰기 : 메세지를 BigQuery 테이블에 저장
Cloud Storage에 쓰기 : 메세지를 GCS 파일 객체로 저장(TEXT or AVRO 포맷)
구독 ID 와 메세지를 읽어올 토픽을 선택하고, 가장 기본이 되는 가져오기 유형으로 구독을 생성합니다.

▪ 3) 메세지 발행
- 위에서 생성한 토픽에서 테스트 메세지 생성이 가능합니다. 아래와 같이 토픽에 들어가서
메세지 → 메세지 게시 탭을 선택하고,

메세지 본분에 간략히 내용을 적어서 게시하면 해당 토픽에서 메세지를 발행할 수 있습니다.

▪ 4) 메세지 가져오기
- 토픽에서 발행한 메세지는 구독에서 가져올 수 있습니다. 위에서
dev_topic을 구독하는 dev_subscription에서 발행된 메세지 확인이 가능합니다.

🔹 3. Pub/Sub 실습 - Python
- 위에서 처럼 직관적인 GUI 환경에서 설정할 수도 있지만 Google의 Pub/Sub 관련 라이브러리를 설치하면 Python 코드로도 해당 기능 구현이 가능합니다.
▪ 1) 라이브러리 설치 및 기본 설정
- 라이브러리 설치 →
pip install google-cloud-pubsub
- 아래 코드에서 GCP와 통신할 Client 및 토픽 객체를 생성합니다.
from google.cloud import pubsub_v1
from google.oauth2 import service_account
PROJECT_ID = "[프로젝트 ID]"
KEY_PATH = "[서비스 계정 JSON KEY 경로]"
CREDENTIALS = service_account.Credentials.from_service_account_file(KEY_PATH)
TOPIC_ID = "dev_topic"
publisher = pubsub_v1.PublisherClient(credentials=CREDENTIALS)
TOPIC_PATH = publisher.topic_path(PROJECT_ID, TOPIC_ID)
print(TOPIC_PATH)
--- 출력 결과 ---
projects/codeit-hyunsoo/topics/dev_topic
▪ 2) 토픽 생성
- 이미 생성된 토픽이라면 pass 되도록 예외처리까지 포함하여 구성합니다.
from google.api_core.exceptions import AlreadyExists
try:
topic = publisher.create_topic(name=TOPIC_PATH)
print(f"토픽이 생성되었습니다: {topic.name}")
except AlreadyExists:
print(f"이미 존재하는 토픽입니다: {TOPIC_PATH}")
--- 출력 결과 ---
토픽이 생성되었습니다: projects/codeit-hyunsoo/topics/dev_topic
▪ 3) 샘플 메세지 전송
Faker 라이브러리를 활용해 랜덤한 문장 생성 후 dev_topic으로 전송하는 발행자를 만들어줍니다. (3초에 한 번씩 총 10개의 메세지 전송)
from faker import Faker
import time
fake = Faker()
for _ in range(10):
msg = fake.sentence()
future = publisher.publish(
topic=TOPIC_PATH,
data=msg.encode("utf-8"),
source="app1"
)
print(f"보낸 메시지: {msg} / 결과: {future.result()}")
time.sleep(3)
--- 출력 결과 ---
보낸 메시지: Include business head send friend him final. / 결과: 15936265876075595
보낸 메시지: Check example left chance approach large. / 결과: 15936256995631331
보낸 메시지: Heart still property. / 결과: 15936267081923148
보낸 메시지: Can production admit with business moment future. / 결과: 15936297693105990
보낸 메시지: Reflect national government between bag part with mission. / 결과: 15936288400415598
보낸 메시지: Meeting drive anyone note. / 결과: 15936290797590549
보낸 메시지: Particularly fact see far election. / 결과: 15936291128627334
보낸 메시지: Hair energy tax whole model head hit. / 결과: 15936290594708674
보낸 메시지: Right range trade score half. / 결과: 15936291572828675
보낸 메시지: Effect where sign popular family media. / 결과: 15936285434903098
▪ 4) 구독 생성
- 구독 역시 토픽 생성과 비슷하게 서비스계정 JSON KEY를 기반으로 Client 및 구독 객체를 생성하고,
가져오기 유형의 구독을 python으로 생성합니다.
from google.cloud import pubsub_v1
from google.oauth2 import service_account
import time
from google.api_core.exceptions import AlreadyExists
PROJECT_ID = "[프로젝트 ID]"
KEY_PATH = "[서비스 계정 JSON KEY 경로]"
CREDENTIALS = service_account.Credentials.from_service_account_file(KEY_PATH)
TOPIC_ID = "dev_topic"
publisher = pubsub_v1.PublisherClient(credentials=CREDENTIALS)
TOPIC_PATH = publisher.topic_path(PROJECT_ID, TOPIC_ID)
SUBSCRIPTION_ID = "dev_subscription"
subscriber = pubsub_v1.SubscriberClient(credentials=CREDENTIALS)
SUBSCRIPTION_PATH = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)
try:
subscription = subscriber.create_subscription(
name=SUBSCRIPTION_PATH,
topic=TOPIC_PATH
)
print(f"구독이 생성되었습니다: {SUBSCRIPTION_PATH}")
except AlreadyExists:
print(f"이미 존재하는 구독입니다: {SUBSCRIPTION_PATH}")
--- 출력 결과 ---
구독이 생성되었습니다: projects/codeit-hyunsoo/subscriptions/dev_subscription
▪ 5) 토픽에 저장된 메세지 읽기
dev_topic에 저장된 메세지들은 구독을 통해 읽어올 수 있습니다. 아래 코드는 1초에 한 번씩 메세지를 확인하여 출력해줍니다.
def callback(message):
print(f"받은 메시지: {message.data.decode('utf-8')}")
message.ack()
streaming_pull_future = subscriber.subscribe(SUBSCRIPTION_PATH, callback=callback)
print(f"구독을 시작합니다: {SUBSCRIPTION_PATH}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
streaming_pull_future.cancel()
print("구독을 중단합니다.")
finally:
streaming_pull_future.cancel()
subscriber.close()
--- 출력 결과 ---
구독을 시작합니다: projects/codeit-hyunsoo/subscriptions/dev_subscription
받은 메시지: This is Sample Message
구독을 중단합니다.
🔹 4. OUTRO
- 지금까지 Google Cloud Pub/Sub의 기본 개념부터 실제 구현까지 단계별로 살펴보았습니다. 토픽과 프로듀서, 컨슈머 개념 등 사용해보면서 Kafka와 상당히 비슷하다는 느낌을 받을 수 있었습니다.
- 다음 글에서는 토픽으로 들어온 메세지를 BigQuery와 GCS에 객체로 저장하는 방법에 대해 다뤄보도록 하겠습니다.