[Data]. GCP BigQuery python Client 사용하기

jongmin-oh·2023년 11월 16일
0

python 클라이언트 연동

참고: https://wooiljeong.github.io/python/python-bigquery/

설치

pip install google-cloud-bigquery

클라이언트 연결

# 서비스 계정 키 JSON 파일 경로
key_path = glob.glob("./*.json")[0]

# Credentials 객체 생성
credentials = service_account.Credentials.from_service_account_file(key_path)

client = bigquery.Client(credentials = credentials, 
                         project = credentials.project_id)

테이블 생성(create)

def create_table(dataset_id, table_id, schema):
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    
    table = bigquery.Table(table_ref, schema=schema)
    table = client.create_table(table)

    print(f'Table {table.project}.{table.dataset_id}.{table.table_id} created.')
    
# 예제 사용
dataset_id = 'test' # bigQuery Datasets
table_id = 'temp'
schema = [
    bigquery.SchemaField('column1', 'STRING'),
    bigquery.SchemaField('column2', 'INTEGER'),
]

# 테이블 생성
create_table(dataset_id, table_id, schema)

데이터 삽입(insert)

  1. batch insert (실시간으로 데이터 갱신이 필요없는 경우)
def insert_data(dataset_id, table_id, df):
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)

    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()
    print(f'Data loaded to {table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}.')

# 데이터 삽입
data = {
    'column1': ['value1', 'value2', 'value3'],
    'column2': [1, 2, 3],
    # Add more columns as needed
}

df = pd.DataFrame(data)
insert_data(dataset_id, table_id, df)
  1. streaming insert (실시간으로 데이터 갱신이 필요한 경우)
  • 빅쿼리의 스트리밍 삽입은 무료 티어에서 허용되지 않음
def insert_data(dataset_id, table_id, rows):
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    
    table = client.get_table(table_ref)

    errors = client.insert_rows(table, rows)

    if not errors:
        print(f'{len(rows)} rows inserted into {table.project}.{table.dataset_id}.{table.table_id}.')
    else:
        print(f'Errors during insert: {errors}')

스트리밍과 배치 처리는 데이터를 처리하고 저장하는 방식에 차이가 있습니다.

배치 처리 (Batch Processing)

  • 장점: 대량의 데이터를 효율적으로 처리할 수 있습니다. 한 번에 많은 양의 데이터를 읽어들이고 처리하므로 전체적인 처리 시간이 단축될 수 있습니다.

  • 단점: 실시간으로 데이터를 처리하는 데는 적합하지 않습니다. 데이터가 쌓인 후에야 처리가 이루어지기 때문에 데이터의 신속한 반영이 필요한 경우에는 부적합할 수 있습니다.

스트리밍 처리 (Stream Processing)

  • 장점: 실시간으로 데이터를 처리하고 반영할 수 있어 실시간 분석 및 응용 프로그램에 적합합니다. 데이터의 변동 사항을 신속하게 파악할 수 있습니다.

  • 단점: 대량의 데이터를 처리하는 데는 어려울 수 있습니다. 빅데이터와 같이 대용량의 데이터를 처리해야 하는 경우에는 배치 처리가 더 효과적일 수 있습니다.

데이터 조회

  1. 한줄씩 읽기
def query_data(query):
    query_job = client.query(query)
    results = query_job.result()

    for row in results:
        print(row)
  1. 데이터 프레임으로 받아서 읽기
query_job.to_dataframe()

select_query = f'SELECT * FROM `{client.project}.{dataset_id}.{table_id}` LIMIT 5'
query_data(select_query)

결과:

Row(('value1', 1), {'column1': 0, 'column2': 1})
Row(('value2', 2), {'column1': 0, 'column2': 1})
Row(('value3', 3), {'column1': 0, 'column2': 1})

데이터 업데이트

def update_data(dataset_id, table_id, update_condition, update_values):
    query = f'''
        UPDATE `{client.project}.{dataset_id}.{table_id}`
        SET {update_values}
        WHERE {update_condition}
    '''

    query_job = client.query(query)
    results = query_job.result()

    assert query_job.num_dml_affected_rows is not None

    print(f"DML query modified {query_job.num_dml_affected_rows} rows.")
    return query_job.num_dml_affected_rows
# 데이터 업데이트
update_condition = 'column1 = "value1"'
update_values = 'column2 = 10'
update_data(dataset_id, table_id, update_condition, update_values)

결과 메세지 : DML query modified 1 rows.

데이터 삭제

def delete_data(dataset_id, table_id, delete_condition):
    query = f'''
        DELETE FROM `{client.project}.{dataset_id}.{table_id}`
        WHERE {delete_condition}
    '''

    query_job = client.query(query)
    results = query_job.result()

    assert query_job.num_dml_affected_rows is not None

    print(f"DML query modified {query_job.num_dml_affected_rows} rows.")
    return query_job.num_dml_affected_rows
# 데이터 삭제
delete_condition = 'column1 = "value3"'
delete_data(dataset_id, table_id, delete_condition)

결과 메세지 : DML query modified 1 rows.

profile
스타트업에서 자연어처리 챗봇을 연구하는 머신러닝 개발자입니다.

0개의 댓글