Dataflow관련 실습을 진행할 때 Google에서 제공해주는 템플릿을 쓰는 거 아니면 그냥 특정 실습에서 쓰는 코드를 clone하여 명령어만 입력했다. 하지만 실습에서 쓰는 코드들은 항상 그 실습에만 적용되는 Transform이 포함되어 있다. 때문에 간단하고 기본적인 템플릿을 한 번 커스텀해봤다.
sudo apt-get update
# Python3이 설치되어 있다고 가정하고 아래 명령어들 시행
sudo apt-get install -y python3-venv
python3 -m venv <가상환경 이름>
source <가상환경 이름>/bin/activate
# 필요한 패키지들 설치
python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]
ps_to_bq.py
import argparse
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner, DirectRunner
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
# On-prem 환경일 경우 import os를 한 후, 아래와 같이 서비스 계정 키 경로를 넣어줘야 한다.
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "<서비스 계정 경로>"
class CustomParsing(beam.DoFn):
""" Custom ParallelDo class to apply a custom transformation """
def to_runner_api_parameter(self, unused_context):
# Not very relevant, returns a URN (uniform resource name) and the payload
return "beam:transforms:custom_parsing:custom_v0", None
def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
"""
Simple processing function to parse the data and add a timestamp
For additional params see:
https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
"""
parsed = json.loads(element.decode("utf-8"))
# rfc3339란 날짜, 시간을 표현함에 있어 표준이 되는 포맷이다.
parsed["timestamp"] = timestamp.to_rfc3339()
yield parsed
def run():
# Parsing arguments
# project, region, staging_location, temp_location은 필수로 지정해줘야 하는 항목이다.
parser = argparse.ArgumentParser()
parser.add_argument("--project",required=True,help="GCP 프로젝트 ID",)
parser.add_argument("--region",required=True,help="GCP 리전",)
parser.add_argument("--staging_location",required=True,help="Cloud Storage Bucket의 staging 폴더 경로",)
parser.add_argument("--temp_location",required=True,help="Cloud Storage Bucket의 temp 폴더 경로",)
parser.add_argument("--input_subscription",required=True,help="데이터를 전달할 Pub/Sub 주제의 구독 경로",)
parser.add_argument("--output_table",required=True,help="빅쿼리 테이블 경로",)
parser.add_argument("--output_schema",required=True,help="스키마 지정",)
parser.add_argument("--runner",required=True,help="Dataflow Runner 지정",)
known_args, pipeline_args = parser.parse_known_args()
# Creating pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
pipeline_options.view_as(GoogleCloudOptions).project = known_args.project
pipeline_options.view_as(GoogleCloudOptions).region = known_args.region
pipeline_options.view_as(GoogleCloudOptions).staging_location = known_args.staging_location
pipeline_options.view_as(GoogleCloudOptions).temp_location = known_args.temp_location
pipeline_options.view_as(StandardOptions).runner = known_args.runner
# Defining our pipeline and its steps
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=known_args.input_subscription, timestamp_attribute=None
)
| "CustomParse" >> beam.ParDo(CustomParsing())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=known_args.output_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
# create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
# write_disposition 옵션 = 기존 테이블에 데이터 추가
# creat_disposition 옵션 = 빅쿼리에 테이블이 없다고 하면 여기서 명시해주는 스키마대로 필요하면 테이블을 생성
)
)
if __name__ == "__main__":
run()
실행
python ps_to_bq.py --streaming \
--project=<프로젝트 ID> \
--region=<리전> \
--staging_location=gs://<버킷명>/staging \
--temp_location=gs://<버킷명>/temp \
--input_subscription=projects/<프로젝트 ID>/subscriptions/<구독명> \
--output_table=<프로젝트 ID>:<데이터세트명>.<테이블명> \
--output_schema="timestamp:TIMESTAMP,<숫자 변수>:FLOAT,<문자 변수>:STRING" \
--runner=DataflowRunner \
--job_name=<Dataflow Job 이름>
콘솔에서 Pub/Sub 주제에 직접 메시지를 게시해줄 수도 있지만 원하는 갯수만큼 랜덤하게 데이터를 생성해서 Pub/Sub으로 보낼 파이썬 파일을 만들 수 있다.
ps_publisher_emulator.py
import json
import time
from datetime import datetime
from random import random
from google.auth import jwt
from google.cloud import pubsub_v1
# --- Base variables and auth path
CREDENTIALS_PATH = "<GCP 서비스 계정 키 경로>"
PROJECT_ID = "<프로젝트 ID>"
TOPIC_ID = "<주제 이름>"
MAX_MESSAGES = <Pub/Sub으로 보내고 싶은 데이터 개수>
# --- PubSub Utils Classes
class PubSubPublisher:
def __init__(self, credentials_path, project_id, topic_id):
credentials = jwt.Credentials.from_service_account_info(
json.load(open(credentials_path)),
audience="https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
)
self.project_id = project_id
self.topic_id = topic_id
self.publisher = pubsub_v1.PublisherClient(credentials=credentials)
self.topic_path = self.publisher.topic_path(self.project_id, self.topic_id)
def publish(self, data: str):
result = self.publisher.publish(self.topic_path, data.encode("utf-8"))
return result
# --- Main publishing script
# 설정해준 MAX_MESSAGES 갯수만큼 랜덤하게 메시지가 생성되어서 Pub/Sub으로 보내진다.
def main():
i = 0
publisher = PubSubPublisher(CREDENTIALS_PATH, PROJECT_ID, TOPIC_ID)
while i < MAX_MESSAGES:
data = {
"<FLOAT 유형의 컬럼>": random(),
"<STRING 유형의 컬럼>": f"Hi-{datetime.now()}"
}
publisher.publish(json.dumps(data))
time.sleep(random())
i += 1
if __name__ == "__main__":
main()
실행
python ps_publisher_emulator.py
[Dataflow 커스텀 템플릿 Pub/Sub to BigQuery 참고1]
https://www.cloudskillsboost.google/course_templates/229
[Dataflow 커스텀 템플릿 Pub/Sub to BigQuery 참고2]
https://medium.com/codex/a-dataflow-journey-from-pubsub-to-bigquery-68eb3270c93