Pipeline은 데이터 처리 작업을 나타내는 directed acyclic graph이며, 사용자는 데이터를 읽고 변환하고 쓰는 등 다양한 데이터 작업을 수행할 수 있다.
textio.read, map등 일반적으로 제공하는 method도 있지만, 사용자가 직접 정의할 수도 있다
위처럼 선형적으로 데이터를 읽고 출력하도록 pipeline을 구성하는 것뿐만 아니라, 아래와 같이 PCollection을 분기하여 여러 PTransform에 의해 처리되도록 pipeline을 구성할 수도 있다.
Apache Beam pipeline을 작성하기 위해서는 먼저 Apache Beam SDK를 설치 해야 한다.
# 기본 sdk, local 또는 다른 cloud 환경에서 apache beam을 사용할 때 설치하자
$ pip install apache-beam
# GCP와 관련된 기능을 포함한 sdk이다. GCS, BigQuery, PubSub등과 연동하기 위한 PTransform 등을 포함한다
$ pip install apache-beam[gcp]
다음과 같은 pipeline을 작성한다고 가정해보자.
각 항목은 pipeline이 처리할 로직의 순서인 step이며, 각 step은 pipe(|)로 연결하여 정의할 수 있다.
# pipeline.py
from abc import ABCMeta
import apache_beam as beam
class Pipeline(metaclass=ABCMeta):
def run() -> None:
with beam.Pipeline() as pipeline:
# '|': 각 step을 구분한다
(
pipeline
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription='subscription-name')
| 'Execute func1' >> beam.Map(func1)
| 'Execute func2' >> beam.Map(func2)
| 'Write data to GCS' >> beam.io.WriteToText(file_path_prefix='/dest/path')
)
그리고, main을 작성하여 pipeline을 실행하면 끝이다.
# main.py
if __name__ == '__main__':
pipeline = Pipeline()
pipeline.run()