DevCourse TIL Day3 Week9

김태준·2023년 6월 1일
0

Data Enginnering DevCourse

목록 보기
39/93
post-thumbnail

오늘 학습할 내용은 어제에 이어서 GCS를 DW인 Bigquery에 자동으로 연동하는 방법에 대해 알아보고자 한다.

✅ Cloud Function

  • AWS의 lambda와 유사한 기능을 수행하며 클라우드 환경 내 코드를 간단히 실행할 수 있다.
  • 자동화를 위해 cloud scheduler에서 주기적으로 cloud function함수를 실행시켜준다.
    구체적으로, cloud function을 만들면 다음과 같다.
import os
import pandas as pd
from google.cloud import bigquery
from google.cloud import storage

project_id = os.environ.get('GCP_PROJECT')

def upload_csv_to_bigquery(data, context):
    # GCS 버킷 내 파일 정보
    bucket_name = data['bucket']
    file_name = data['name']
    file_path = f"gs://{bucket_name}/{file_name}"

    # 원하는 폴더 경로
    target_folder = 'gs://hale-posting-bucket/google_trend/'

    # 해당 폴더 내 파일만 처리
    if not file_name.startswith(target_folder):
        print(f"Ignoring file {file_name}. It is not in the target folder.")
        return
    
    # 빅쿼리 정보
    dataset_id = 'job-posting-api-388303.external'
    table_id = 'job-posting-api-388303.external.google_trend'

    # 스키마 정의
    schema = [
        bigquery.SchemaField("column1", "STRING"),
        bigquery.SchemaField("column2", "INTEGER"),
        bigquery.SchemaField("column3", "FLOAT"),
    ]

    # BigQuery, GCS 클라이언트 생성
    bq_client = bigquery.Client(project=project_id)
    gcs_client = storage.Client(project=project_id)

    df = pd.read_csv(file_path)

    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        schema=schema,
        write_disposition='WRITE_APPEND'
    )

    table_ref = bq_client.dataset(dataset_id).table(table_id)
    load_job = bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    load_job.result()

    print(f"CSV file {file_name} uploaded to BigQuery table {dataset_id}.{table_id}")

# Cloud Function entry point
def gcs_to_bigquery(event, context):
    # CSV 파일 업로드를 트리거한 GCS 이벤트 정보
    data = event

    # CSV 파일 업로드 처리 함수 호출
    upload_csv_to_bigquery(data, context)

근데 위 코드를 실행해도 에러가 발생한다..
추가로 존재하는 requirements.txt에 pandas, google-cloud-bigquery, google-cloud-storage 를 설치해도 에러가 발생하니.. 참.. 후
파이썬으로 cloud function 코드를 찾아 실행하는 것도 문제가 있고 표본도 적어 해당 작업에는 꽤 시간이 걸릴 것 같다.

✅ Method 2 - GCE & cron

GCE(Computer Engine) : GCP 중 하나로, 가상 머신 인스턴스를 실행하는데 사용되는 서비스이다. GCE를 사용하게 되면 user는 가상 머신 인스턴스를 프로비저닝하고 OS를 선택하고 필요한 SW 설치, 애플리케이션 실행 등 여러 작업을 수행할 수 있다.

음, AWS 상에서 EC2로 생각하면 될 것 같다. 통상 GCP에서는 VM이라고 부르며 더 간단히 정리하면 가상머신(VM)관리하고 프로비저닝하는 기능이다.

cron : unix(리눅스 전용) 계열 OS에서 주기적으로 작업을 실행하기 위한 시스템으로 이를 사용하면 user는 특정 시간에 스크립트, 명령을 자동으로 실행할 수 있다.
윈도우에선 실행 X....

🎈 GCE 특징

  • VM에 인스턴스 생성 후 인스턴스의 lifecycle 관리
  • 복수의 vm 인스턴스의 load-balancing과 auto-scaling 처리
  • 복수의 VM 인스턴스의 네트워크 연결 및 구성 관리

🎈 Instance Template

인스턴스를 생성할 때마다 VM 인스턴스의 세부정보(이미지, 유형, region, Zone등)을 지정해야 하는 것을 번거로울 수 있다.

  • zone : region 내 GCP 리소스 배포 영역
    이러한 번거로움을 해결하기 위해 탬플릿을 만들어 인스턴스에 default로 적용시켜줄 수 있다.

✅ Cloud Function (Complete)

드디어 해결한 Cloud Function.... 후
결국 파이썬 코드로 해결했다.

from google.cloud import bigquery

def import_gt_data_to_bigquery(data, context):
    client = bigquery.Client()
    project_id = 'job-posting-api-388303'
    dataset_id = 'job-posting-api-388303.external'
    bucket_name = data['bucket']
    file_name = data['name']
    
    target_folder = 'google_trend/'
    if not file_name.startswith(target_folder):
        print(f"Ignoring file {file_name}. It is not in the target folder")
        return
    
    file_name = file_name.split('/')
    table_id = file_name[-2]
    print(file_name)
    
    file_ext = file_name[-1].split('.')[-1]
    if file_ext == 'csv':
        uri = 'gs://' + bucket_name + '/' + data['name']
        dataset_ref = client.dataset(dataset_id)
        table_ref = dataset_ref.table(table_id)

        # load config
        job_config = bigquery.LoadJobConfig()
        job_config.autodetect = True
        job_config.schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("정보처리", "INTEGER"),
        bigquery.SchemaField("인공지능", "INTEGER"),
        bigquery.SchemaField("빅데이터", "INTEGER"),
        bigquery.SchemaField("백준", "INTEGER"),
        bigquery.SchemaField("프로그래머스", "INTEGER")
        ],
        job_config.write_disposition = [
            bigquery.WriteDisposition.WRITE_APPEND
        ]

        # load data
        load_job = client.load_table_from_uri(
            uri, 
            table_ref,
            job_config=job_config
        )
        print('Started job {}'.format(load_job.job_id))
        load_job.result()
        print('job finished')

        destination_table = client.get_table(dataset_ref.table(table_id))
        print('Loaded {} rows.'.format(destination_table.num_rows))
    else:
        print('Nothing To Do')
profile
To be a DataScientist

0개의 댓글