python 클라이언트 연동
설치
pip install google-cloud-bigquery
클라이언트 연결
# 서비스 계정 키 JSON 파일 경로
key_path = glob.glob("./*.json")[0]
# Credentials 객체 생성
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials = credentials,
project = credentials.project_id)
def create_table(dataset_id, table_id, schema):
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table)
print(f'Table {table.project}.{table.dataset_id}.{table.table_id} created.')
# 예제 사용
dataset_id = 'test' # bigQuery Datasets
table_id = 'temp'
schema = [
bigquery.SchemaField('column1', 'STRING'),
bigquery.SchemaField('column2', 'INTEGER'),
]
# 테이블 생성
create_table(dataset_id, table_id, schema)
def insert_data(dataset_id, table_id, df):
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()
print(f'Data loaded to {table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}.')
# 데이터 삽입
data = {
'column1': ['value1', 'value2', 'value3'],
'column2': [1, 2, 3],
# Add more columns as needed
}
df = pd.DataFrame(data)
insert_data(dataset_id, table_id, df)
def insert_data(dataset_id, table_id, rows):
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)
errors = client.insert_rows(table, rows)
if not errors:
print(f'{len(rows)} rows inserted into {table.project}.{table.dataset_id}.{table.table_id}.')
else:
print(f'Errors during insert: {errors}')
스트리밍과 배치 처리는 데이터를 처리하고 저장하는 방식에 차이가 있습니다.
장점: 대량의 데이터를 효율적으로 처리할 수 있습니다. 한 번에 많은 양의 데이터를 읽어들이고 처리하므로 전체적인 처리 시간이 단축될 수 있습니다.
단점: 실시간으로 데이터를 처리하는 데는 적합하지 않습니다. 데이터가 쌓인 후에야 처리가 이루어지기 때문에 데이터의 신속한 반영이 필요한 경우에는 부적합할 수 있습니다.
장점: 실시간으로 데이터를 처리하고 반영할 수 있어 실시간 분석 및 응용 프로그램에 적합합니다. 데이터의 변동 사항을 신속하게 파악할 수 있습니다.
단점: 대량의 데이터를 처리하는 데는 어려울 수 있습니다. 빅데이터와 같이 대용량의 데이터를 처리해야 하는 경우에는 배치 처리가 더 효과적일 수 있습니다.
def query_data(query):
query_job = client.query(query)
results = query_job.result()
for row in results:
print(row)
query_job.to_dataframe()
select_query = f'SELECT * FROM `{client.project}.{dataset_id}.{table_id}` LIMIT 5'
query_data(select_query)
결과:
Row(('value1', 1), {'column1': 0, 'column2': 1})
Row(('value2', 2), {'column1': 0, 'column2': 1})
Row(('value3', 3), {'column1': 0, 'column2': 1})
def update_data(dataset_id, table_id, update_condition, update_values):
query = f'''
UPDATE `{client.project}.{dataset_id}.{table_id}`
SET {update_values}
WHERE {update_condition}
'''
query_job = client.query(query)
results = query_job.result()
assert query_job.num_dml_affected_rows is not None
print(f"DML query modified {query_job.num_dml_affected_rows} rows.")
return query_job.num_dml_affected_rows
# 데이터 업데이트
update_condition = 'column1 = "value1"'
update_values = 'column2 = 10'
update_data(dataset_id, table_id, update_condition, update_values)
결과 메세지 : DML query modified 1 rows.
def delete_data(dataset_id, table_id, delete_condition):
query = f'''
DELETE FROM `{client.project}.{dataset_id}.{table_id}`
WHERE {delete_condition}
'''
query_job = client.query(query)
results = query_job.result()
assert query_job.num_dml_affected_rows is not None
print(f"DML query modified {query_job.num_dml_affected_rows} rows.")
return query_job.num_dml_affected_rows
# 데이터 삭제
delete_condition = 'column1 = "value3"'
delete_data(dataset_id, table_id, delete_condition)
결과 메세지 : DML query modified 1 rows.