Pipeline을 작성해보자

Yeon Seong Hwang·2024년 5월 6일
0

Apache Beam

목록 보기
2/2

📌 Pipeline이란

Pipeline은 데이터 처리 작업을 나타내는 directed acyclic graph이며, 사용자는 데이터를 읽고 변환하고 쓰는 등 다양한 데이터 작업을 수행할 수 있다.

  • PTransform: pipeline의 데이터 처리 작업(단계)를 나타내며, 각 단계의 출력은 다음 단계의 입력이 된다. textio.read, map등 일반적으로 제공하는 method도 있지만, 사용자가 직접 정의할 수도 있다
  • PCollection: 분산된 데이터 stream이며, PTransform에 의해 생성되고 pipeline의 각 단계로 전달된다

simple

위처럼 선형적으로 데이터를 읽고 출력하도록 pipeline을 구성하는 것뿐만 아니라, 아래와 같이 PCollection을 분기하여 여러 PTransform에 의해 처리되도록 pipeline을 구성할 수도 있다.
pipeline


📌 Pipeline 생성하기

Apache Beam pipeline을 작성하기 위해서는 먼저 Apache Beam SDK를 설치 해야 한다.

# 기본 sdk, local 또는 다른 cloud 환경에서 apache beam을 사용할 때 설치하자
$ pip install apache-beam
# GCP와 관련된 기능을 포함한 sdk이다. GCS, BigQuery, PubSub등과 연동하기 위한 PTransform 등을 포함한다
$ pip install apache-beam[gcp]

다음과 같은 pipeline을 작성한다고 가정해보자.

  • step1: GCP Pub/Sub으로부터 data를 읽어온다
  • step2: func1을 실행한다
  • step3: func2를 실행한다
  • step4: data를 GCS에 write한다

각 항목은 pipeline이 처리할 로직의 순서인 step이며, 각 step은 pipe(|)로 연결하여 정의할 수 있다.

# pipeline.py
from abc import ABCMeta
import apache_beam as beam

class Pipeline(metaclass=ABCMeta):

    def run() -> None:
        with beam.Pipeline() as pipeline:
            # '|': 각 step을 구분한다
            (
                pipeline
                | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription='subscription-name')
                | 'Execute func1' >> beam.Map(func1)
                | 'Execute func2' >> beam.Map(func2)
                | 'Write data to GCS' >> beam.io.WriteToText(file_path_prefix='/dest/path')
            )

그리고, main을 작성하여 pipeline을 실행하면 끝이다.

# main.py
if __name__ == '__main__':
    pipeline = Pipeline()
    pipeline.run()

🔗 참고자료

profile
온 몸으로 기억하기 위해 기록합니다. 🌱

0개의 댓글