GCP(Google Cloud Platform) Dataflow & Apache Beam

overFlowoong·2024년 10월 28일

MLOps

목록 보기
2/4
post-thumbnail

1. Apache Beam 설치

$ pip install apache-beam[gcp]

2. 데이터 처리

2-1. 클라우드 저장소 내의 데이터 처리(Dataflow 에서 실행)

  • input.txt 에는 일반 sentence 가 담겨있고 이를 각 word의 빈도를 나타내는 output.txt 를 반환
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
import os

def run():
		"""
		bucket이 생성되어 있어야 함.
		
		bucket_id
		      ㄴ input.txt (처리하고자 하는 입력 데이터)
		"""
    # Google Cloud 인증 (서비스 계정 키 파일 경로 설정)
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "Google Cloud 서비스 계정 키 파일"

    # 파이프라인 옵션 설정
    options = PipelineOptions()

    # GCP 옵션 설정
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = "Google Cloud 프로젝트 ID"  # Google Cloud 프로젝트 ID 설정
    google_cloud_options.region = "Dataflow를 실행할 지역"  # Dataflow 실행 지역 설정 (예: 'us-central1')
    google_cloud_options.staging_location = "gs://bucket이름/staging"  # 스테이징 파일 경로
    google_cloud_options.temp_location = "gs://bucket이름/temp"  # 임시 파일 경로
    options.view_as(StandardOptions).runner = "DataflowRunner"  # Dataflow에서 실행

    # 파이프라인 정의
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | 'ReadText' >> beam.io.ReadFromText('gs://bucket이름/input.txt')  # 입력 텍스트 파일
            | 'SplitWords' >> beam.FlatMap(lambda line: line.split())  # 각 라인을 단어로 나누기
            | 'PairWordsWithOne' >> beam.Map(lambda word: (word, 1))  # 각 단어를 (word, 1) 쌍으로 변환
            | 'CountWords' >> beam.CombinePerKey(sum)  # 같은 단어의 빈도 합산
            | 'WriteResults' >> beam.io.WriteToText('gs://bucket이름/output.txt')  # 결과를 텍스트 파일로 저장
        )

if __name__ == "__main__":
    run()

해당 스크립트를 실행하면 입력 데이터가 처리된 output.txt 파일이 작성됨

2-2. Pub/Sub 을 활용한 데이터 스트리밍 (Dataflow 에서 실행)

  • 먼저 원하는 topic(주제) 에 구독을 생성하여 연결 (구독 생성은 Google Cloud ConSole Web UI 에서도 가능)
from google.cloud import pubsub_v1
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "Google Cloud 서비스 계정 키 파일"

# Google Cloud 프로젝트 ID
project_id = "프로젝트 ID"
# 구독할 주제 이름
topic_id = "구독할 topic 이름"
# 구독의 이름
subscription_id = "생성하고자 하는 구독 이름"

# Pub/Sub 클라이언트 생성
subscriber = pubsub_v1.SubscriberClient()

# 구독의 경로 (구독 경로는 "projects/{project_id}/subscriptions/{subscription_id}" 형식)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
topic_path = f"projects/{project_id}/topics/{topic_id}"

# 구독 생성
subscription = subscriber.create_subscription(
    name=subscription_path,
    topic=topic_path,
    ack_deadline_seconds=30  # 메시지를 처리하고 승인할 수 있는 시간 (본 포스트에서는 30초로 설정)
)

print(f"Subscription created: {subscription}")

  • 해당 스크립트를 통해 Dataflow 에서 Pub/Sub 을 통한 데이터 스트리밍 (해당 스크립트는 1분 단위로 응답 코드(response_code) 별 요청 개수 집계하는 스크립트)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import json
import os
# import logging

# # 로그 레벨 설정
# logging.basicConfig(level=logging.INFO)

def parse_log_line(log_line):
    """
    Args
	    log_line: 처리하고자 하는 byte 입력
    
    Return
	    log line을 디코딩 하여 필요한 데이터만 담은 딕셔너리
    """
    try:
        if isinstance(log_line, bytes):
            log_line = log_line.decode('utf-8')
        log_data = json.loads(log_line)
        # logging.info(f"Parsed log data: {log_data}")  # 로그 메시지 기록
        return {
            'ip': log_data['ip'],
            'timestamp': log_data['timestamp'],
            'response_code': log_data['response_code']
        }
    except json.JSONDecodeError:
        return None

def run():
    # Google Cloud 인증 (서비스 계정 키 파일 경로 설정)
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "Google Cloud 서비스 계정 키 파일"

    # 파이프라인 옵션 설정
    options = PipelineOptions()

    # GCP 옵션 설정
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = "Google Cloud 프로젝트 ID"  # Google Cloud 프로젝트 ID
    google_cloud_options.region = "Dataflow를 실행할 지역"  # Dataflow 실행 지역
    google_cloud_options.staging_location = "gs://bucket이름/staging"
    google_cloud_options.temp_location = "gs://bucket이름/temp"
    
    # 스트리밍 모드 설정
    options.view_as(StandardOptions).runner = "DataflowRunner"  # Dataflow에서 실행
    options.view_as(StandardOptions).streaming = True  # 스트리밍 모드 설정

    # 파이프라인 정의
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | 'ReadFromPubSub' >> ReadFromPubSub(subscription='projects/프로젝트명/subscriptions/구독명')
            | 'ParseLog' >> beam.Map(parse_log_line)
            | 'FilterValidLogs' >> beam.Filter(lambda x: x is not None)  # 유효한 로그만 필터링
            | 'ExtractResponseCode' >> beam.Map(lambda log: (log['response_code'], 1))
            | 'WindowIntoFixed' >> beam.WindowInto(FixedWindows(60))  # 1분 간격의 윈도 적용
            | 'CountPerResponseCode' >> beam.CombinePerKey(sum)  # 응답 코드별 집계
            | 'PrepareForBigQuery' >> beam.Map(lambda record: {'response_code': record[0], 'count': record[1]})
            | 'WriteToBigQuery' >> WriteToBigQuery(
                table='프로젝트 ID:데이터세트명.테이블명',    # Pub/Sub 을 통해 받아온 데이터를 처리 후 저장 할 bigquery 테이블
                schema='response_code:STRING, count:INTEGER',
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
        )

if __name__ == "__main__":
    run()

해당 스크립트를 실행 하면 Google Cloud Dataflow 에서 작업이 실행됨

  • Pub/Sub 에 실시간으로 데이터 전송
from google.cloud import pubsub_v1
import json
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "Google Cloud 서비스 계정 키 파일"

# Google Cloud Pub/Sub 클라이언트 초기화
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('프로젝트 ID', 'topic 이름')

# 전송할 데이터 생성
message_data = {
    "timestamp": "2000-01-01T12:34:56Z",
    "ip": "192.168.0.1",
    "url": "/home",
    "response_code": 200,
    "response_time_ms": 123
}

# JSON 데이터를 문자열로 변환하고 UTF-8 인코딩 후 Pub/Sub로 전송
message_json = json.dumps(message_data)
message_bytes = message_json.encode('utf-8')

# 메시지 전송
future = publisher.publish(topic_path, data=message_bytes)
print(future.result())  # 메시지가 성공적으로 전송되면 메시지 ID 출력
profile
나도 할래 기술 블로그!

0개의 댓글