- 4장에서는 쉘을 활용해 빅쿼리를 다루는 방법이므로 대부분 생략
5장 빅쿼리로 데이터 로드하기
- 이 장은 빅쿼리를 사용하는 부분을 스크립트로 구현하거나 자동화된 테스크로 구현할 때 유용하다.
프로그래밍 방식을 활용한 개발
- 프로그래밍 방식으로 빅쿼리를 활용하기 위해서는 각자가 선호하는 프로그래밍 언어로 구글 클라우드 클라이언트 라이브러리를 사용하는 것이 좋다.
REST API 활용하기
- 빅쿼리는 다른 구글 클라우드 서비스처럼 전통적인 JSON/REST 인터페이스를 제공하므로 직접 HTTP 요청을 보내서 빅쿼리 서비스에서 쿼리를 실행할 수 있다.
- REST API는 API가 참조하는 객체를 마치 컬렉션 내의 정적 파일처럼 보이게 하며 HTTP 동사를 이용해 CRUD 작업을 표현한다.
- 접근 토큰은 애플리케이션의 기본 자격 증명을 가져오기 위한 방법이다. 이 자격 증명은 구글 클라우드 소프트웨어 개발 키드(SDK, Software Development Kit)에 로그인할 목적으로 제공되는 임시 자격 증명이다.
구글 클라우드 클라이언트 라이브러리
export GOOGLE_APPLICATION_CREDENTIALS="key 경로"
pip install google-cloud-bigquery
from google.cloud import bigquery
bq = bigquery.Client(project=PROJECT)
dsinfo = bq.get_dataset('bigquery-public-data.london_bicycles')
dsinfo = bq.get_dataset('ch04')
dataset_id = "{}.ch05".format(PROJECT)
ds = bq.create_dataset(dataset_id, exists_ok=True)
bq.delete_dataset('ch05', not_found_ok=True)
tables = bq.list_tables('bigquery-public-data.london_bicycles')
for table in tables:
print(table.table_id)
bq.delete_table('ch05.temp_table', not_found_ok=True)
- 빅쿼리의 '시간 여행(time travel)' 기능을 이용하면 삭제한 테이블을 복구할 수 있다.(2일 이내)
- 예를 들어 최근 7일 이내의 어떤 시점에 존재했던 버전의 테이블을 복구하려면 타임스탬프를 명시해 복사본을 만들면 된다.
- 하지만 테이블을 삭제한 후 데이터셋에 같은 ID를 가진 테이블을 생성하거나, 스냅샷을 보관하는 데이터셋도 삭제했다면 스냅샷을 몽땅 잃게 된다.
table_id ='{}.ch05.temp_table'.format(PROJECT)
table = bq.create_table(table_id, exists_ok=True)
schema = [
bigquery.SchemaField("chapter", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
]
table_id = '{}.ch05.temp_table'.format(PROJECT)
table = bq.get_table(table_id)
print(table.etag)
table.schema = schema
table = bq.update_table(table, ["schema"])
print(table.schema)
print(table.etag)
rows = [
(1, u'What is BigQuery?'),
(2, u'Query essentials'),
]
errors = bq.insert_rows(table, rows)
- 테이블에 행을 삽입하는 것은 스트리밍 작업이므로 테이블 메타데이터는 즉시 갱신되지 않는다.
- 테이블을 생성한 후에 스키마를 갱신하는 것보다 테이블을 생성하는 시점에 스키마를 제공하는 것이 낫다.
- 빅쿼리 파이썬 클라이언트는 데이터를 판다스와 데이터프레임으로부터 로드하는 방법, URL로부터 로드하는 방법, 로컬 파일로부터 로드하는 방법 등 세가지를 지원한다.
- 빅쿼리 파이썬 클라이언트는 데이터를 판다스와 데이터프레임으로부터 로드하는 방법
import pandas as pd
data =[
(1, u'What is Bigquery?'),
(2, u'Query essentials'),
]
df = pd.DataFrame(data, columns=['chapter', 'title'])
table_id = 'ch05.temp_table3'
job = bq.load_table_from_dataframe(df, table_id)
job.result()
print("Loaded {} rows into {}".format(job.output_rows, table_id))
- URL로부터 로드하는 방법
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.CSV
job_config.null_marker = 'NULL'
uri = "gs://bigquery-oreilly-book/college_scorecard.csv"
table_id = 'ch05.college_scorecard_gcs'
job = bq.load_table_from_uri(uri, table_id, job_config=job_config)
while not job.done():
print('.', end='', flush=True)
time.sleep(0.1)
print('Done')
table = bq.get_table(tblref)
print("Loaded {} rows into {}.".format(table.num_rows, table.table_id))
- 로컬 파일로부터 로드하는 방법
- 로컬 파일을 로드하려면 파일 객체를 생성한 후 load_table_from_file메서드를 호출하면 된다.
with gzip.open('../04_load/college_scorecard.csv.gz') as fp:
job = bq.load_table_from_file(fp, tblref, job_config=job_config)
- 빅쿼리 웹 UI에서는 쿼리 비용을 치르지 않고도 테이블을 미리 보는 기능을 지원한다. tabledata.list REST API도 같은 기능을 제공하며, 이에 따라 파이썬 API도 이 기능을 제공한다.
table_id = 'bigquery-public-data.london_bicycles.cycle_stations'
table = bq.get_table(table_id)
rows = bq.list_rows(table,
start_index=0,
max_results=5)
- 구글 클라우드 클라이언트 라이브러리의 가장 큰 장점은 쿼리의 실행에 있다. 페이지 분할, 재시도 등 여러 복잡한 내용이 투명하게 처리되기 때문이다.
query = """
SELECT
start_station_name
, AVG(duration) as duration
, COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY start_station_name
ORDER BY num_trips DESC
LIMIT 10
"""
config = bigquery.QueryJobConfig()
config.dry_run = True
job = bq.query(query, location='EU', job_config=config)
print("This query will process {} bytes."
.format(job.total_bytes_processed))
- 연습 실행은 비용이 발생하지 않는다. 개발 및 테스트 과정에서 연습 실행을 통해 실제로 쿼리를 실행하지 않고도 선언되지 않은 변수나 쿼리 결과를 스키마를 확인할 수 있다.
query = """
SELECT
start_station_name
, AVG(duration) as duration
, COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY start_station_name
"""
df = bq.query(query, location='EU').to_dataframe()
print(df.describe())
- 쿼리는 반드시 정적인 문자열일 필요는 없다. 쿼리를 파라미터화하고 쿼리 작업을 생성할 때 파라미터를 지정할 수도 있다. 다음 쿼리는 총 사용시간이 지정한 값도다 큰 이용횟수를 구한다. 이때 기준이 되는 사용 시간인 min_duration은 쿼리를 실행하는 시점에 사용된다.
query2 = """
SELECT
start_station_name
, COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire
WHERE duration >= @min_duration
GROUP BY start_station_name
ORDER BY num_trips DESC
LIMIT 10
"""
- 문자열의 형식화를 이용해 쿼리를 작성하는 것은 정말 좋지 않는 방법이다. format을 활용해 문자열을 조작하면 데이터 웨어하우스가 SQL 주입 공격의 위험에 노출된다.
- SQL Injection : 악의적인 사용자가 보안상의 취약점을 이용하여, 임의의 SQL 문을 주입하고 실행되게 하여 데이터베이스가 비정상적인 동작을 하도록 조작하는 행위
- 이름이 지정된 파라미터를 가진 쿼리를 실행할 때는 해당 파리미터 값을 job.config 파라미터를 통해 전달해야 한다.
config = bigquery.QueryJobConfig()
config.query_parameters = [
bigquery.ScalarQueryParameter('min_duration', "INT64", 600)
]
job = bq.query(query2, location='EU', job_config=config)