[GCP] Python으로 BigQuery 자유자재로 다루기!

NewNewDaddy·2025년 1월 5일
0

데이터 분석

목록 보기
6/6
post-thumbnail

🔹 0. INTRO

🔹 1. 사전 준비 사항

▪ 1) 라이브러리 설치

  • Bigquery를 Python으로 다루기 위해서는 google-cloud-bigquery, db-dtypes, pandas-gbq 이렇게 세 가지 라이브러리가 필요하므로 설치를 먼저 해준다.

    pip install google-cloud-bigquery db-dtypes pandas-gbq

▪ 2) 서비스 계정 API Key 발급

  • Python으로 BigQuery에 저장된 데이터에 접근해야 하기 때문에 권한 인증이 필요합니다. 따라서 적절한 권한이 부여된 GCP 서비스 계정의 API Key를 다운받아야 합니다.
  • 새로운 서비스 계정 생성 후 BigQuery Admin 권한을 설정 후 Json 형식의 Key 파일을 다운받으면 되며 서비스 계정 생성 및 권한 부여, API KEY 생성 관련해서는 [GCP] Google Cloud IAM 권한 설정 기초 및 실습 이 글에서 상세히 다루고 있으므로 참고하시면 됩니다.

🔹 2. Pandas 라이브러리 활용

  • Pandas는 read_csv, read_parquet, read_excel 등 다양한 데이터 소스들에서 데이터를 가져올 수 있도록 메소드가 구현되어 있습니다. 그 중 특별한 것은 클라우드 서비스들 중 유일하게 BigQuery에 대한 메소드가 구현되어 있다는 것입니다. 물론 BigQuery 전용 Python 라이브러리가 아니기 때문에 데이터 읽기와 쓰기 기능만 지원됩니다.

▪ 1) 인증 객체 생성

  • 메소드를 활용할 때 마다 파라미터로 권한 인증 관련 객체를 넘겨주어야 하기 때문에 해당 객체를 먼저 생성합니다.
from google.oauth2 import service_account

API_KEY_PATH = "[서비스 계정 json 키 경로]"

credentials = service_account.Credentials.from_service_account_file(API_KEY_PATH)

▪ 1) 빅쿼리 데이터 읽기

project_id = "codeit-hyunsoo"
dataset = "sprint"
table = "trainer"
location = 'asia-northeast3'

# 조회 쿼리
query = f"SELECT * FROM {project_id}.{dataset}.{table}"

df = pd.read_gbq(
    query=query,
    project_id=project_id,
    credentials=credentials,
    location=location
    )
    
df.head()

▪ 2) 빅쿼리에 데이터 쓰기

project_id = "codeit-hyunsoo"
dataset = "sprint"
new_table = "trainer_new"
location = 'asia-northeast3'

df.to_gbq(
    destination_table=f"{dataset}.{new_table}", # [데이터세트].[테이블]
    project_id=project_id,
    credentials=credentials,
    location='asia-northeast3',
    if_exists='replace'  # 'fail', 'replace', 'append'
    )

🔹 3. BigQuery-Python 라이브러리 활용

  • BigQuery의 Python 전용 라이브러리인 google-cloud-bigquery의 기능을 활용하면 BigQuery를 훨씬 더 세부적으로 다루는 것이 가능합니다.

▪ 1) BigQuery Client 생성

  • 우리가 Python으로 내리는 명령을 BigQuery에 전달하여 적용될 수 있도록 해주는 Client를 먼저 생성해줍니다.
from google.cloud import bigquery
from google.oauth2 import service_account

API_KEY_PATH = "[서비스 계정 json 키 경로]"
project_id = "codeit-hyunsoo"
location = "asia-northeast3"

# 서비스 계정 KEY 인증 객체
credentials = service_account.Credentials.from_service_account_file(API_KEY_PATH)

client = bigquery.Client(
    project=project_id,
    credentials=credentials,
    location=location
    )

▪ 2) Dataset 생성, 조회, 삭제

2-1) Dataset 생성 → create_dataset()

  • 'practice' 라는 이름의 dataset을 생성
dataset_name = "practice"

dataset_obj = bigquery.Dataset(f"{project_id}.{dataset_name}") # [프로젝트id].[새로운 데이터세트 이름]

client.create_dataset(
    dataset=dataset_obj,
    exists_ok=True, # 동일한 이름의 dataset 존재시 pass
    )

2-2) Dataset 목록 조회 → list_datasets()

for dataset in client.list_datasets():
    print(dataset.dataset_id)
    
----
games
sprint
practice

2-3) Dataset 삭제 → delete_dataset()

client.delete_dataset(
    dataset=f"{project_id}.{dataset_name}",
    delete_contents=False,
    not_found_ok=True
	)

▪ 3) Table 조회, 복사, 생성, 삭제, 저장

3-1) Table 데이터 조회 → query()

  • sprint 데이터셋의 emp 테이블을 조회하여 dataframe으로 저장합니다.
# 테이블 조회 쿼리
query = f"SELECT * FROM {project_id}.sprint.emp"

data = client.query(
    query=query,
    project=project_id,
    location=location
    )
    
df = data.to_dataframe()

3-2) Table 목록 조회 → list_tables()

  • sprint 데이터셋 내에 있는 테이블의 목록을 조회합니다.
dataset_name = 'sprint'

for table in client.list_tables("dataset_name"):
    print(table.table_id)
    
---
pokemon
trainer
battle
emp

3-3) Table 복사 → copy_table()

  • sprint 데이터셋에서 practice 데이터셋으로 trainer 테이블을 복사합니다.
# sprint.trainer  -----copy----->  practice.trainer

client.copy_table(
    project=project_id,
    sources="sprint.trainer",
    destination="practice.trainer",
    location=location
)

3-4) Table 삭제 → delete_table()

  • practice 데이터셋 내의 trainer 테이블을 삭제합니다.
client.delete_table(
    table="practice.trainer",
    not_found_ok=True # 삭제 대상 테이블이 없어도 에러가 나지 않도록 하는 설정
	)

3-5) DataFrame을 BigQuery 테이블로 저장 → load_table_from_dataframe()

  • 3-1에서 생성한 DataFrame을 practice 데이터셋의 emp 테이블로 저장합니다.
client.load_table_from_dataframe(
    project=project_id,
    dataframe=df, # 저장할 DataFrame
    destination="practice.emp", # BigQuery 테이블 경로
    location=location
	)

▪ 4) BigQuery 테이블을 GCS에 파일로 저장

  • sprintemp 테이블을 GCS에 아래의 형태로 저장하는 실습 코드입니다.
    • csv 형식
    • csv.gz(csv 압축 파일) 형식

4-1) 일반 CSV 형식으로 저장

project_id = "codeit-hyunsoo"
dataset_id = "sprint"
table_id = "emp"
location = 'asia-northeast3'

dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_id)

client.extract_table(
    project=project_id,
    source=table_ref,
    destination_uris="gs://[버킷 이름]/[파일 세부 저장 경로]/[파일명].csv",
    location=location
	)

4-2) 압축 CSV 형식으로 저장

dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_id)

# 파일 압축으로 위한 설정
job_config = bigquery.job.ExtractJobConfig()
job_config.compression = bigquery.Compression.GZIP

client.extract_table(
    project=project_id,
    source=table_ref,
    destination_uris="gs://[버킷 이름]/[파일 세부 저장 경로]/[파일명].csv.gz",
    location=location,
    job_config=job_config
	)

▪ 5) GCS 파일을 BigQuery 테이블로 저장

  • GCS에 저장된 emp 테이블 파일을 BigQuery의 practice 데이터셋, emp 테이블로 로드하는 실습 코드입니다. 'CSV' 형식 파일과 'PARQUET' 형식 파일에 대해 각각 다룹니다.

5-1) CSV 파일 → BigQuery 테이블

job_config = bigquery.LoadJobConfig(
    autodetect=True, # 스키마 자동 감지
    skip_leading_rows=1,
    source_format=bigquery.SourceFormat.CSV
	)

client.load_table_from_uri(
    project=project_id,
    location=location,
    source_uris="[GCS의 CSV 파일 경로]",
    destination="practice.emp_csv",
    job_config=job_config
)

5-2) PARQUET 파일 → BigQuery 테이블

job_config = bigquery.LoadJobConfig(
    autodetect=True, # 스키마 자동 감지
    source_format=bigquery.SourceFormat.PARQUET
	)

client.load_table_from_uri(
    project=project_id,
    location=location,
    source_uris="[GCS의 PARQUET 파일 경로]",
    destination="practice.emp_parquet",
    job_config=job_config
)

🔹 4. OUTRO

  • Python을 활용하여 BigQuery의 Dataset 단위 명령, Table 단위 명령, 그리고 GCS(Google Cloud Storage) 와의 통합 코드 등을 알아보았습니다.
  • Pandas에서 지원하는 read_gbq(), to_gbq() 메소드를 통해 간단히 읽고 쓰는 작업이 가능했지만 보다 세부적인 작업은 google-cloud-bigquery 라이브러리를 통해서만 가능하였습니다.
  • 세부적인 작업들까지도 Python API가 지원되고 있기 때문에 BigQuery를 대상으로 하는 데이터 ETL 작업 및 데이터 파이프라인 개발 역시 충분히 가능할 것이라 생각됩니다.
profile
데이터 엔지니어의 작업공간 / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

0개의 댓글