오늘 학습할 내용은 어제에 이어서 GCS를 DW인 Bigquery에 자동으로 연동하는 방법에 대해 알아보고자 한다.
- AWS의 lambda와 유사한 기능을 수행하며 클라우드 환경 내 코드를 간단히 실행할 수 있다.
- 자동화를 위해 cloud scheduler에서 주기적으로 cloud function함수를 실행시켜준다.
구체적으로, cloud function을 만들면 다음과 같다.
import os
import pandas as pd
from google.cloud import bigquery
from google.cloud import storage
project_id = os.environ.get('GCP_PROJECT')
def upload_csv_to_bigquery(data, context):
# GCS 버킷 내 파일 정보
bucket_name = data['bucket']
file_name = data['name']
file_path = f"gs://{bucket_name}/{file_name}"
# 원하는 폴더 경로
target_folder = 'gs://hale-posting-bucket/google_trend/'
# 해당 폴더 내 파일만 처리
if not file_name.startswith(target_folder):
print(f"Ignoring file {file_name}. It is not in the target folder.")
return
# 빅쿼리 정보
dataset_id = 'job-posting-api-388303.external'
table_id = 'job-posting-api-388303.external.google_trend'
# 스키마 정의
schema = [
bigquery.SchemaField("column1", "STRING"),
bigquery.SchemaField("column2", "INTEGER"),
bigquery.SchemaField("column3", "FLOAT"),
]
# BigQuery, GCS 클라이언트 생성
bq_client = bigquery.Client(project=project_id)
gcs_client = storage.Client(project=project_id)
df = pd.read_csv(file_path)
job_config = bigquery.LoadJobConfig(
autodetect=True,
schema=schema,
write_disposition='WRITE_APPEND'
)
table_ref = bq_client.dataset(dataset_id).table(table_id)
load_job = bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config)
load_job.result()
print(f"CSV file {file_name} uploaded to BigQuery table {dataset_id}.{table_id}")
# Cloud Function entry point
def gcs_to_bigquery(event, context):
# CSV 파일 업로드를 트리거한 GCS 이벤트 정보
data = event
# CSV 파일 업로드 처리 함수 호출
upload_csv_to_bigquery(data, context)
근데 위 코드를 실행해도 에러가 발생한다..
추가로 존재하는 requirements.txt에 pandas, google-cloud-bigquery, google-cloud-storage 를 설치해도 에러가 발생하니.. 참.. 후
파이썬으로 cloud function 코드를 찾아 실행하는 것도 문제가 있고 표본도 적어 해당 작업에는 꽤 시간이 걸릴 것 같다.
GCE(Computer Engine) : GCP 중 하나로, 가상 머신 인스턴스를 실행하는데 사용되는 서비스이다. GCE를 사용하게 되면 user는 가상 머신 인스턴스를 프로비저닝하고 OS를 선택하고 필요한 SW 설치, 애플리케이션 실행 등 여러 작업을 수행할 수 있다.
음, AWS 상에서 EC2로 생각하면 될 것 같다. 통상 GCP에서는 VM이라고 부르며 더 간단히 정리하면 가상머신(VM)관리하고 프로비저닝하는 기능이다.
cron : unix(리눅스 전용) 계열 OS에서 주기적으로 작업을 실행하기 위한 시스템으로 이를 사용하면 user는 특정 시간에 스크립트, 명령을 자동으로 실행할 수 있다.
윈도우에선 실행 X....🎈 GCE 특징
- VM에 인스턴스 생성 후 인스턴스의 lifecycle 관리
- 복수의 vm 인스턴스의 load-balancing과 auto-scaling 처리
- 복수의 VM 인스턴스의 네트워크 연결 및 구성 관리
🎈 Instance Template
인스턴스를 생성할 때마다 VM 인스턴스의 세부정보(이미지, 유형, region, Zone등)을 지정해야 하는 것을 번거로울 수 있다.
- zone : region 내 GCP 리소스 배포 영역
이러한 번거로움을 해결하기 위해 탬플릿을 만들어 인스턴스에 default로 적용시켜줄 수 있다.
드디어 해결한 Cloud Function.... 후
결국 파이썬 코드로 해결했다.
from google.cloud import bigquery
def import_gt_data_to_bigquery(data, context):
client = bigquery.Client()
project_id = 'job-posting-api-388303'
dataset_id = 'job-posting-api-388303.external'
bucket_name = data['bucket']
file_name = data['name']
target_folder = 'google_trend/'
if not file_name.startswith(target_folder):
print(f"Ignoring file {file_name}. It is not in the target folder")
return
file_name = file_name.split('/')
table_id = file_name[-2]
print(file_name)
file_ext = file_name[-1].split('.')[-1]
if file_ext == 'csv':
uri = 'gs://' + bucket_name + '/' + data['name']
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
# load config
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.schema = [
bigquery.SchemaField("date", "DATE"),
bigquery.SchemaField("정보처리", "INTEGER"),
bigquery.SchemaField("인공지능", "INTEGER"),
bigquery.SchemaField("빅데이터", "INTEGER"),
bigquery.SchemaField("백준", "INTEGER"),
bigquery.SchemaField("프로그래머스", "INTEGER")
],
job_config.write_disposition = [
bigquery.WriteDisposition.WRITE_APPEND
]
# load data
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config
)
print('Started job {}'.format(load_job.job_id))
load_job.result()
print('job finished')
destination_table = client.get_table(dataset_ref.table(table_id))
print('Loaded {} rows.'.format(destination_table.num_rows))
else:
print('Nothing To Do')