
토픽으로 전송되는 메세지를 저장할 수 있는 빅쿼리 테이블을 생성합니다. 이 때 구독 유형 중 스키마 사용 안함 옵션을 선택하게 되면 data라는 컬럼 하나가 있는 테이블의 row에 토픽에서 읽은 메세지가 저장됩니다.

BigQuery의 bq_data 데이터셋에 data 컬럼 하나만 있는 pubsub_tbl_simple 테이블을 생성합니다.

테이블 스키마 사용 옵션을 선택하면, 생성한 BigQuery 테이블의 스키마에 맞춰 토픽으로 데이터를 전송할 수 있고, 구독은 해당 데이터를 읽어 BigQuery 테이블에 적재합니다.
id int64
name object
year int64
gender object
count int64pubsub_tbl_names BigQuery 테이블을 생성합니다.
가져오기 : 메세지를 읽어오기푸시 : 메세지를 다른 Endpoint URL로 전송BigQuery에 쓰기 : 메세지를 BigQuery 테이블에 저장Cloud Storage에 쓰기 : 메세지를 GCS 파일 객체로 저장그 중 BigQuery에 쓰기 유형의 구독을 생성하여 토픽에 들어오는 데이터를 실시간으로 읽어와 BigQuery 테이블에 저장해보도록 하겠습니다.
Google Pub/Sub 서비스의 토픽·발행자·구독자의 개념 및 토픽 생성 방법은 이전 글(링크)을 참고하시면 됩니다.
생성할 구독은 총 2가지로,
1) 단일 스키마 테이블인 pubsub_tbl_simple로 전송하는 구독
2) 커스텀 스키마 테이블인 pubsub_tbl_names로 전송하는 구독
이렇게 생성합니다.
data라는 컬럼 하나를 가진 pubsub_tbl_simple 테이블로 데이터를 전송하는 구독을 생성해보겠습니다.dev_bq_subscriptionBigQuery에 쓰기스키마 사용 안함
pubsub_tbl_names 테이블로 데이터를 전송하는 구독을 생성합니다.dev_bq_names_subscriptionBigQuery에 쓰기테이블 스키마 사용
토픽에 들어온 데이터를 받아 각 테이블들로 저장해줄 구독 2개 생성이 완료되었습니다.


dev_topic 토픽에 전송하였을 때, 빅쿼리 테이블들에 어떤식으로 데이터가 적재되는지 확인해보도록 하겠습니다.dev_topic 토픽으로 데이터가 3초에 한 번씩 전송되도록 합니다.import pandas as pd
from google.cloud import pubsub_v1
from google.oauth2 import service_account
import json, time
from faker import Faker
## 1. PubSub 토픽 관련 설정
PROJECT_ID = "[프로젝트 ID]"
KEY_PATH = "[서비스 계정 JSON KEY 경로]"
CREDENTIALS = service_account.Credentials.from_service_account_file(KEY_PATH)
TOPIC_ID = "dev_topic"
publisher = pubsub_v1.PublisherClient(credentials=CREDENTIALS)
TOPIC_PATH = publisher.topic_path(PROJECT_ID, TOPIC_ID)
# 2. 보낼 데이터셋을 읽어 JSON 형식으로 변환
df = pd.read_csv('names.csv')
df_dict = df.to_dict(orient='records')
# 3. dev_topic으로 데이터를 3초에 한 번씩 전송
for row in df_dict[:10]:
future = publisher.publish(
topic=TOPIC_PATH,
data=json.dumps(row).encode("utf-8")
)
print(f"보낸 메시지: {row} / 결과: {future.result()}")
time.sleep(3)
--- 출력 결과 ---
보낸 메시지: {'id': 1, 'name': 'Mary', 'year': 1880, 'gender': 'F', 'count': 7065} / 결과: 16021080392424521
보낸 메시지: {'id': 2, 'name': 'Anna', 'year': 1880, 'gender': 'F', 'count': 2604} / 결과: 16021545983015478
보낸 메시지: {'id': 3, 'name': 'Emma', 'year': 1880, 'gender': 'F', 'count': 2003} / 결과: 16022743523116683
보낸 메시지: {'id': 4, 'name': 'Elizabeth', 'year': 1880, 'gender': 'F', 'count': 1939} / 결과: 16022824001573059
보낸 메시지: {'id': 5, 'name': 'Minnie', 'year': 1880, 'gender': 'F', 'count': 1746} / 결과: 16022962397816275
보낸 메시지: {'id': 6, 'name': 'Margaret', 'year': 1880, 'gender': 'F', 'count': 1578} / 결과: 16022437774262030
보낸 메시지: {'id': 7, 'name': 'Ida', 'year': 1880, 'gender': 'F', 'count': 1472} / 결과: 16021982500229449
보낸 메시지: {'id': 8, 'name': 'Alice', 'year': 1880, 'gender': 'F', 'count': 1414} / 결과: 16022962752184659
보낸 메시지: {'id': 9, 'name': 'Bertha', 'year': 1880, 'gender': 'F', 'count': 1320} / 결과: 16022046604942106
보낸 메시지: {'id': 10, 'name': 'Sarah', 'year': 1880, 'gender': 'F', 'count': 1288} / 결과: 16022147167489566
pubsub_tbl_simple 테이블의 경우 전송한 JSON 형식의 데이터가 data라는 단일 컬럼 안에 STRING 형식으로 저장되는 것을 확인할 수 있습니다. 이렇게 스키마 사용 안함 유형의 구독은 추후 분석을 위해 테이블에 대한 추가적인 가공이 필요하며, 정형 데이터보다는 비정형이나 TEXT 형태의 데이터를 실시간으로 저장할 때 유용하게 쓰일 수 있을 것 같습니다.
pubsub_tbl_names 테이블의 경우 테이블 스키마에 맞게 데이터를 전송하면 각 컬럼에 해당 값이 실시간으로 저장되어 분석에 용이한 정형 데이터 테이블 형태로 관리할 수 있습니다.