google-cloud-bigquery
, db-dtypes
, pandas-gbq
이렇게 세 가지 라이브러리가 필요하므로 설치를 먼저 해준다.pip install google-cloud-bigquery db-dtypes pandas-gbq
BigQuery Admin
권한을 설정 후 Json 형식의 Key 파일을 다운받으면 되며 서비스 계정 생성 및 권한 부여, API KEY 생성 관련해서는 [GCP] Google Cloud IAM 권한 설정 기초 및 실습 이 글에서 상세히 다루고 있으므로 참고하시면 됩니다.read_csv
, read_parquet
, read_excel
등 다양한 데이터 소스들에서 데이터를 가져올 수 있도록 메소드가 구현되어 있습니다. 그 중 특별한 것은 클라우드 서비스들 중 유일하게 BigQuery에 대한 메소드가 구현되어 있다는 것입니다. 물론 BigQuery 전용 Python 라이브러리가 아니기 때문에 데이터 읽기와 쓰기 기능만 지원됩니다.from google.oauth2 import service_account
API_KEY_PATH = "[서비스 계정 json 키 경로]"
credentials = service_account.Credentials.from_service_account_file(API_KEY_PATH)
project_id = "codeit-hyunsoo"
dataset = "sprint"
table = "trainer"
location = 'asia-northeast3'
# 조회 쿼리
query = f"SELECT * FROM {project_id}.{dataset}.{table}"
df = pd.read_gbq(
query=query,
project_id=project_id,
credentials=credentials,
location=location
)
df.head()
project_id = "codeit-hyunsoo"
dataset = "sprint"
new_table = "trainer_new"
location = 'asia-northeast3'
df.to_gbq(
destination_table=f"{dataset}.{new_table}", # [데이터세트].[테이블]
project_id=project_id,
credentials=credentials,
location='asia-northeast3',
if_exists='replace' # 'fail', 'replace', 'append'
)
google-cloud-bigquery
의 기능을 활용하면 BigQuery를 훨씬 더 세부적으로 다루는 것이 가능합니다.from google.cloud import bigquery
from google.oauth2 import service_account
API_KEY_PATH = "[서비스 계정 json 키 경로]"
project_id = "codeit-hyunsoo"
location = "asia-northeast3"
# 서비스 계정 KEY 인증 객체
credentials = service_account.Credentials.from_service_account_file(API_KEY_PATH)
client = bigquery.Client(
project=project_id,
credentials=credentials,
location=location
)
dataset_name = "practice"
dataset_obj = bigquery.Dataset(f"{project_id}.{dataset_name}") # [프로젝트id].[새로운 데이터세트 이름]
client.create_dataset(
dataset=dataset_obj,
exists_ok=True, # 동일한 이름의 dataset 존재시 pass
)
for dataset in client.list_datasets():
print(dataset.dataset_id)
----
games
sprint
practice
client.delete_dataset(
dataset=f"{project_id}.{dataset_name}",
delete_contents=False,
not_found_ok=True
)
sprint
데이터셋의 emp
테이블을 조회하여 dataframe으로 저장합니다.# 테이블 조회 쿼리
query = f"SELECT * FROM {project_id}.sprint.emp"
data = client.query(
query=query,
project=project_id,
location=location
)
df = data.to_dataframe()
sprint
데이터셋 내에 있는 테이블의 목록을 조회합니다.dataset_name = 'sprint'
for table in client.list_tables("dataset_name"):
print(table.table_id)
---
pokemon
trainer
battle
emp
sprint
데이터셋에서 practice
데이터셋으로 trainer
테이블을 복사합니다.# sprint.trainer -----copy-----> practice.trainer
client.copy_table(
project=project_id,
sources="sprint.trainer",
destination="practice.trainer",
location=location
)
practice
데이터셋 내의 trainer
테이블을 삭제합니다.client.delete_table(
table="practice.trainer",
not_found_ok=True # 삭제 대상 테이블이 없어도 에러가 나지 않도록 하는 설정
)
3-1
에서 생성한 DataFrame을 practice
데이터셋의 emp
테이블로 저장합니다.client.load_table_from_dataframe(
project=project_id,
dataframe=df, # 저장할 DataFrame
destination="practice.emp", # BigQuery 테이블 경로
location=location
)
sprint
의 emp
테이블을 GCS에 아래의 형태로 저장하는 실습 코드입니다.project_id = "codeit-hyunsoo"
dataset_id = "sprint"
table_id = "emp"
location = 'asia-northeast3'
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_id)
client.extract_table(
project=project_id,
source=table_ref,
destination_uris="gs://[버킷 이름]/[파일 세부 저장 경로]/[파일명].csv",
location=location
)
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_id)
# 파일 압축으로 위한 설정
job_config = bigquery.job.ExtractJobConfig()
job_config.compression = bigquery.Compression.GZIP
client.extract_table(
project=project_id,
source=table_ref,
destination_uris="gs://[버킷 이름]/[파일 세부 저장 경로]/[파일명].csv.gz",
location=location,
job_config=job_config
)
emp
테이블 파일을 BigQuery의 practice
데이터셋, emp
테이블로 로드하는 실습 코드입니다. 'CSV' 형식 파일과 'PARQUET' 형식 파일에 대해 각각 다룹니다.job_config = bigquery.LoadJobConfig(
autodetect=True, # 스키마 자동 감지
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV
)
client.load_table_from_uri(
project=project_id,
location=location,
source_uris="[GCS의 CSV 파일 경로]",
destination="practice.emp_csv",
job_config=job_config
)
job_config = bigquery.LoadJobConfig(
autodetect=True, # 스키마 자동 감지
source_format=bigquery.SourceFormat.PARQUET
)
client.load_table_from_uri(
project=project_id,
location=location,
source_uris="[GCS의 PARQUET 파일 경로]",
destination="practice.emp_parquet",
job_config=job_config
)
read_gbq()
, to_gbq()
메소드를 통해 간단히 읽고 쓰는 작업이 가능했지만 보다 세부적인 작업은 google-cloud-bigquery
라이브러리를 통해서만 가능하였습니다.