Dataflow(Apache Beam) 간단 문법

김민형·2022년 9월 20일
1

GCP - Data

목록 보기
12/44

Dataflow는 Apache Beam을 기반으로 한다.

Python 말고 다른 언어(Java, Go)를 알고 싶으면 Apache Beam 프로그래밍 가이드 -> 참고

아파치 빔을 기반으로 하기 때문에 import 역시 'apache_beam'을 import에서 사용한다.
기본적으로는 다음과 같은 형태로 코드 작성

[output PCollection] = [Input PCollection | [라벨] >> [Transform]]

Pipeline 만들기

파이프라인은 처음부터 끝까지 전체 데이터 처리 작업을 캡슐화한다.
여기에는 입력 데이터 읽기, 해당 데이터 변환 및 출력 데이터 쓰기가 포함.
모든 데이터플로 프로그램은 파이프라인을 생성해야 한다. 파이프라인을 생성할 때 파이프라인에 실행할 위치왑 방법을 알려주는 실행 옵션도 지정해야 한다.

  • Pipeline Option 설정 없이 생성하는 방법
import apache_beam as beam
pipeline = beam.pipeline()
  • Pipeline Option 설정과 함께 생성하는 방법
pipeline = beam.pipeline(Options=PipelineOptions())

PCollection 만들기

PCollection은 Pipeline으로 데이터를 읽거나 내보낼 때 사용.
Apache Beam에서 제공하는 I/O 어댑터는 PCollection으로 반환한다.

import apache_beam as beam
pipeline = beam.Pipeline()

# 'ReadMyFile'은 PCollection 이름
pcollection = pipeline | 'ReadMyFile' >> beam.io.ReadFromText('gs://<input 파일 경로>')

Python 자체의 list 등을 PCollection으로 만드려면 아래과 같이 beam.Create()를 사용하면 된다.

import apache_beam as beam
test_list = [0,1,2,3,4,5,6,7,8,9]
test_pcollection = beam.Create(test_list)

Transform 이용하기

Transform은 입력받은 데이터를 변환하는 과정으로 하나 이상의 PCollection으로부터 받은 데이터를 Transform을 이용하여 처리하게 된다. 처리된 결과도 PCollection으로 출력해야 한다.

  • 하나의 Input PCollection에 1개의 Transform을 이용해 Output PCollection을 만드는 방법
[Output PCollection] = [Input PCollection] | [Transform]
  • 하나의 Input PCollection에 여러개의 Transform을 이용해 Output PCollection을 만드는 방법
[Output PCollection] = [Input PCollection] | [Transform 1] | [Transform 2]

ParDo

ParDo는 사용자가 원하는 코드를 ParDo를 통해 작성을 하면 거기에 맞춰 데이터가 변환하여 하나 이상의 출력 PCollection을 반환할 수 있다.
ParDo를 이용하면 아래와 같은 처리를 할 수 있다.

  • 필터링
  • 다른 자료형으로 변환
  • 데이터 세트에서 각 요소 추출
  • 데이터 세트의 개별 연산

ParDo는 파이프라인에서 일반적인 중간 단계로 데이터를 변환할 때 사용.
ParDo변환을 적용할 때는 DoFn 객체의 형태로 사용자 코드를 제공해야 한다.
아래 코드에서 ComputeWordLengthFn은 beam.DoFn을 상속받아서 process()함수 부분에 처리할 ㅋ모드를 이용해 데이터를 변환할 수 있다. 이렇게 하면 beam.ParDo() 부분에서 위에서 만들어 놓은 beam.DoFn 객체를 사용할 수 있다.

import apache_beam as beam
pipeline = beam.pipeline()

#GCS로부터 데이터를 읽어옴
input_pcollection = pipeline | 'ReadMyFile' >> beam.io.ReadFromText('gs://<input 파일 경로>')

#ParDo를 사용하기 위해 사용자가 직접 DoFn의 상속을 받아 코드를 작성
class ComputeWordLengthFn(beam.DoFn):
    def process(self, element):
        return [len(element)]

word_length = input_pcollection | beam.ParDo(computeWordLengthFn())

# 람다를 사용하면 class를 안쓰고 밑의 한 줄이면 된다.
word_length = input_pcollection | beam.Map(lambda: len: [element])

GroupByKey

GroupByKey는 Key-Value 형태로 되이있는 컬렉션을 처리하기 위한 Transform.
Key 기준으로 Value를 하나로 묶는데 사용

import apache_beam as beam

sample_text = [
    ('cat',1),
    ('dog', 5),
    ('and', 1),
    ('cat', 5),
    ('and', 2)
    ...
]

grouping = sample_text | beam.GroupByKey()
print(grouping)

# 결과
# [('and', [1,2]), ('cat', [1,2]), ('dog', [5])]

CoGroupByKey

이것 역시 Key기반으로 값을 합쳐주는데, 동일한 Key의 데이터 세트가 여러개 있을 때 사용하는 것이 좋다.

import apache_beam as beam

emails_list = [
    ('a', 'a@ex.com'),
    ('b', 'b@ex.com'),
    ('c', 'c@ex.com'),
    ('d', 'd@ex.com'),
]

phones_list = [
    ('a', '111-2222-3333'),
    ('e', '222-3333-4444'),
    ('a', '333-4444-5555'),
    ('b', '444-5555-6666'),
]

location_list = [
    ('a', 'Seoul'),
    ('e', 'Busan'),
    ('e', 'Goyang'),
    ('d', 'Suwon'),
]

results = ({'phones': 'phones_list', 'emails': 'emails_list', 'location': 'location_list'} | beam.CoGroupByKey())
for result in results:
    print(result)

# 결과
# ('a', {'phones': ['111-2222-3333', '333-4444-5555'], 'emails': ['a@ex.com'], 'location': [Seoul]})
# ('a', {'phones': ['222-3333-4444'], 'emails': [], 'location': [Busan, Goyang]})
# ...

Combine

Combine은 데이터의 요소 및 값의 컬렉션을 결합하기 위해 사용하는 Transform. 안에는 작동할 함수를 만들어서 전달해줘야 한다.

import apache_beam as beam
sample_data = [1,10,100,1000]
def sum_list(values):
    return sum(values)

result = sample_data | beam.CombineGlobally(sum_list)
print(result)

# 결과
# [1111]

Flatten

여러 종류의 PCollection 객체를 하나의 PCollection 객체로 합친다.

import apache_beam as beam
PCollection1 = [1,2,3]
PCollection2 = [4,5,6]
PCollection3 = ['min', 'hyoung', 'kim']

flatted = ((PCollection1), (PCollection2), (PCollection3) | beam.Flatten())
print(flatted)

# 결과
# [1,2,3,4,5,6,'min','hyoung','kim']

Pipeline I/O

파이프라인을 생성할 때 외부 소스(파일이나, 데이터베이스 등)으로부터 데이터를 읽어오거나, 처리한 데이터를 외부로 저장할 때, Pipeline I/O를 이용해 저장할 수 있다.

  • GCS에서 텍스트 파일을 읽어오기
lines = pipeline | beam.io.ReadFromText('gs://<txt 파일 경로>')
  • GCS에 파일 저장
output = beam.io.WriteToText('gs://<저장할 경로>')

Windowing

  • 텀블링 윈도우(Fixed Window)
    아래 예시는 윈도우 길이가 60초인 텀블링 윈도우
import apache_beam as beam
from apache_beam.transforms import window
fixed_window = items | 'fixed_window' >> beam.WindowInto(window.FixedWindows(60))
  • 홉핑 윈도우(Sliding Window)
    아래 예시는 윈도우 길이는 30초, 5초마다 윈도우가 발생하도록 설정한 것
import apache_beam as beam
from apache_beam.transforms import window
fixed_window = items | 'sliding_window' >> beam.WindowInto(window.SlidingWindows(30,5))
  • 세션 윈도우
    아래 예시는 세션 시간을 10분으로 설정
import apache_beam as beam
from apache_beam.transforms import window
session_window = items | 'session_window' >> beam.WindowInto(window.SessionWindows(10,60))

Trigger

트리거는 윈도우의 중간 집계를 위해 사용
먼저, 워터마크라는 개념을 알아야 한다.

워트마크란?
실제 데이터들이 발생한 시간과 서버에 도착하는 시간 사이에는 차이가 발생할 수 있기 때문에 어느 시점까지 데이터를 기다렸다가 처리해야하나? 하는 문제가 있을 수 있다.
이때 실제 데이터가 도착하는 시간을 예측해야 하는데 이를 워터마크라고 한다.
워터마크를 기반으로 Window의 시작, 종료 시간을 예측하고 설정한다.

  • AfterWatermark()는 워터마크가 윈도우의 끝을 통과할 때 트리거링 되는 트리거
from apache_beam.transforms.trigger import AfterWatermark
AfterWatermark()
  • AfterProcessingTime()으로 데이터가 수신된 후 일정한 처리 시간이 지난 후 트리거링(Time trigger)
from apache_beam.transforms.trigger import AfterProcessingTime
AfterProcessingTime(delay=1*60) # 1분
  • AfterCount()로 데이터의 개수를 가지고 트리거링(Element count trigger)
from apache_beam.transforms.trigger import AfterCount
AfterCount(32) # 32개
  • Accumulation_mode를 사용하여 트리거링
    Fixed Window와 AfterProcessingTime() 트리거링과 함께 누적하지 않는 설정

    • AccumulationMode.ACCUMULATING -> 누적o
    • AccumulationMode.DISCARDING -> 누적x
import apache_beam as beam
from apache_beam import WindowInto
from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode
from apache_beam.transforms.window import FixedWindows

PCollection1 | WindowInto(
    FixedWindows(1*60), # 1분
    trigger=AfterProcessingTime(10*60), # 10분
    accumulation_mode=AccumualtionMode.DISCARDING
)
profile
Solutions Architect (rlaalsgud97@gmail.com)

0개의 댓글