[gcp] 주식데이터 크롤링 해서 bigquery에 적재하기 - 3

HOU·2023년 3월 23일
1

gcp로pipline만들기

목록 보기
11/11
post-thumbnail

대망의 bigQuery 적재

제일 하고 싶었던 것! 해볼 수 있게 되었다. 코드만 보면 굉장히 간단하지만 인터넷에 나와있는 여러가지 방법으로 시도했는데 다 실패햇다. 심지어 chatGPT에게 물어본 방법도 시도했지만 역시 실패하였다. ㅋㅋㅋ

상황이 좀 중요해보인다.

데이터 분석과 ML을 위해서 자동 적재 하고 싶은 파일들이 이미 데이터에 적재가 되어 있는 상태였다. 이게 굉장히 중요한 포인트가 될 수도 있을 거 같다. 이건 좀 더 확인이 필요해보인다.

여러가지 실패 케이스

  1. pandas-gbq
    판다스 데이터 프레임을 bigquery에 쉽게 적재하게 해주는 라이브러리인데 데이터프레임을 적용하려고 하니 pyarrow 에러가 계속 발생했다.. 확인해보니 버전문제라는 사람도 있고 여러가지 이유가 있었는데 가장 의심되는 것중에 하나는 파일 형식에 관련한 오류 였다.
    아래는 기본 소스 코드 이다.
import pandas as pd
import pandas_gbq


# 빅쿼리에 적재할 테이블 정보
project_id = credentials.project_id
dataset_id = 'test'
table_name = 'test_comtus'
table_id = f'{project_id}.{dataset_id}.{table_name}'

# 데이터 적재
pandas_gbq.to_gbq(
    df, table_id, project_id=project_id, if_exists='replace'
)
  1. google-cloud-bigquery이용하기
    이것도 pyarrow에러가 발생했다 내가 처음에 찾은 것은 데이터프레임 업로드에 관련한 내용이였다. chatGPT도 이 내용을 추천해 주었다. 녀석
from google.cloud import bigquery

client = bigquery.Client(credentials= credentials,
                         project = credentials.project_id)


# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

job_config = bigquery.LoadJobConfig()

body = six.BytesIO(b"Washington,WA")
print(body)
# client.load_table_from_file(body, 'test_cumtus', job_config=job_config).result()
# previous_rows = client.get_table('test_comtus').num_rows
# assert previous_rows > 0

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
)

load_job = client.load_table_from_dataframe(
    df, 'test_comtus', job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table('test_comtus')
print("Loaded {} rows.".format(destination_table.num_rows)

두번의 실패로 pyarrow가 발생하는 문제는 기존의 파일형식과 dataframe형식간의 문제가 있는거 아닐까라는 의심이 들었다. 이건 추정이다 확실한 정보를 좀 알고 싶은데;; document에서 찾고있는 중이다.

성공한 코드

google documnet에 보니 csv 파일을 덮어 쓰는 방법이 적혀있었다. 방법은 csv 파일 자체를 읽어서 올리는 방법이다.

from google.cloud import bigquery

# client
client = bigquery.Client(credentials= credentials,
                         project = credentials.project_id);
                         
#job_config
job_config = bigquery.LoadJobConfig(
    write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE,
    source_format = bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    autodetect=True
)

# 소스 등록하기
file_path = "./data/stock_netmarble_table.csv"
with open(file_path, 'rb') as source_file:
    job = client.load_table_from_file(source_file, 'stock-predict-380701.stock.stock_netmarble_table', job_config=job_config)
                         
job.result()

table = client.get_table('stock-predict-380701.stock.stock_netmarble_table')
table.table_id

print(
    "Loaded {} rows and {} columns to {}".format(
        table.num_rows, len(table.schema), table_id
    )
)
                         

job_config에서는 여러가지를 설정할 수 있는데 WRITE_TRUNCATE는 덮어쓰기 하는 명령어이다. 이런 설정에 대해서는 GOOGLE 설명서에 잘나와 있으니 확인하기 바란다.

휴 결국 해냈다.

앞에 3가지를 합쳐서 결국에 시간에 맞추어 업데이트하는 로직을 만들었다. 다음엔.. 구글의 편한 여러가지 기능들을 적용해서 만들어봐야겠다.

profile
하루 한 걸음 성장하는 개발자

0개의 댓글