
$ pip install apache-beam[gcp]
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
import os
def run():
"""
bucket이 생성되어 있어야 함.
bucket_id
ㄴ input.txt (처리하고자 하는 입력 데이터)
"""
# Google Cloud 인증 (서비스 계정 키 파일 경로 설정)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "Google Cloud 서비스 계정 키 파일"
# 파이프라인 옵션 설정
options = PipelineOptions()
# GCP 옵션 설정
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "Google Cloud 프로젝트 ID" # Google Cloud 프로젝트 ID 설정
google_cloud_options.region = "Dataflow를 실행할 지역" # Dataflow 실행 지역 설정 (예: 'us-central1')
google_cloud_options.staging_location = "gs://bucket이름/staging" # 스테이징 파일 경로
google_cloud_options.temp_location = "gs://bucket이름/temp" # 임시 파일 경로
options.view_as(StandardOptions).runner = "DataflowRunner" # Dataflow에서 실행
# 파이프라인 정의
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| 'ReadText' >> beam.io.ReadFromText('gs://bucket이름/input.txt') # 입력 텍스트 파일
| 'SplitWords' >> beam.FlatMap(lambda line: line.split()) # 각 라인을 단어로 나누기
| 'PairWordsWithOne' >> beam.Map(lambda word: (word, 1)) # 각 단어를 (word, 1) 쌍으로 변환
| 'CountWords' >> beam.CombinePerKey(sum) # 같은 단어의 빈도 합산
| 'WriteResults' >> beam.io.WriteToText('gs://bucket이름/output.txt') # 결과를 텍스트 파일로 저장
)
if __name__ == "__main__":
run()
해당 스크립트를 실행하면 입력 데이터가 처리된 output.txt 파일이 작성됨
from google.cloud import pubsub_v1
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "Google Cloud 서비스 계정 키 파일"
# Google Cloud 프로젝트 ID
project_id = "프로젝트 ID"
# 구독할 주제 이름
topic_id = "구독할 topic 이름"
# 구독의 이름
subscription_id = "생성하고자 하는 구독 이름"
# Pub/Sub 클라이언트 생성
subscriber = pubsub_v1.SubscriberClient()
# 구독의 경로 (구독 경로는 "projects/{project_id}/subscriptions/{subscription_id}" 형식)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
topic_path = f"projects/{project_id}/topics/{topic_id}"
# 구독 생성
subscription = subscriber.create_subscription(
name=subscription_path,
topic=topic_path,
ack_deadline_seconds=30 # 메시지를 처리하고 승인할 수 있는 시간 (본 포스트에서는 30초로 설정)
)
print(f"Subscription created: {subscription}")
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import json
import os
# import logging
# # 로그 레벨 설정
# logging.basicConfig(level=logging.INFO)
def parse_log_line(log_line):
"""
Args
log_line: 처리하고자 하는 byte 입력
Return
log line을 디코딩 하여 필요한 데이터만 담은 딕셔너리
"""
try:
if isinstance(log_line, bytes):
log_line = log_line.decode('utf-8')
log_data = json.loads(log_line)
# logging.info(f"Parsed log data: {log_data}") # 로그 메시지 기록
return {
'ip': log_data['ip'],
'timestamp': log_data['timestamp'],
'response_code': log_data['response_code']
}
except json.JSONDecodeError:
return None
def run():
# Google Cloud 인증 (서비스 계정 키 파일 경로 설정)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "Google Cloud 서비스 계정 키 파일"
# 파이프라인 옵션 설정
options = PipelineOptions()
# GCP 옵션 설정
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "Google Cloud 프로젝트 ID" # Google Cloud 프로젝트 ID
google_cloud_options.region = "Dataflow를 실행할 지역" # Dataflow 실행 지역
google_cloud_options.staging_location = "gs://bucket이름/staging"
google_cloud_options.temp_location = "gs://bucket이름/temp"
# 스트리밍 모드 설정
options.view_as(StandardOptions).runner = "DataflowRunner" # Dataflow에서 실행
options.view_as(StandardOptions).streaming = True # 스트리밍 모드 설정
# 파이프라인 정의
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| 'ReadFromPubSub' >> ReadFromPubSub(subscription='projects/프로젝트명/subscriptions/구독명')
| 'ParseLog' >> beam.Map(parse_log_line)
| 'FilterValidLogs' >> beam.Filter(lambda x: x is not None) # 유효한 로그만 필터링
| 'ExtractResponseCode' >> beam.Map(lambda log: (log['response_code'], 1))
| 'WindowIntoFixed' >> beam.WindowInto(FixedWindows(60)) # 1분 간격의 윈도 적용
| 'CountPerResponseCode' >> beam.CombinePerKey(sum) # 응답 코드별 집계
| 'PrepareForBigQuery' >> beam.Map(lambda record: {'response_code': record[0], 'count': record[1]})
| 'WriteToBigQuery' >> WriteToBigQuery(
table='프로젝트 ID:데이터세트명.테이블명', # Pub/Sub 을 통해 받아온 데이터를 처리 후 저장 할 bigquery 테이블
schema='response_code:STRING, count:INTEGER',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
if __name__ == "__main__":
run()
해당 스크립트를 실행 하면 Google Cloud Dataflow 에서 작업이 실행됨
from google.cloud import pubsub_v1
import json
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "Google Cloud 서비스 계정 키 파일"
# Google Cloud Pub/Sub 클라이언트 초기화
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('프로젝트 ID', 'topic 이름')
# 전송할 데이터 생성
message_data = {
"timestamp": "2000-01-01T12:34:56Z",
"ip": "192.168.0.1",
"url": "/home",
"response_code": 200,
"response_time_ms": 123
}
# JSON 데이터를 문자열로 변환하고 UTF-8 인코딩 후 Pub/Sub로 전송
message_json = json.dumps(message_data)
message_bytes = message_json.encode('utf-8')
# 메시지 전송
future = publisher.publish(topic_path, data=message_bytes)
print(future.result()) # 메시지가 성공적으로 전송되면 메시지 ID 출력