: 매우 단순한 데이터 파이프라인 패턴 (이 구조에서 응용이 되어 다양한 파이프라인이 생성된다.)
전체 추출 : 원본 데이터 소스에서 추출 작업을 실행할 때마다 전체 데이터를 (레코드의 모든 테이블) 읽어와서, 데이터 웨어하우스에 저장 (원본 데이터를 대체하거나 갱신)
OrderID | OrderStatus | LastUpdated |
---|---|---|
1 | BackOrdered | 2020-06-01 12:00:00 |
증분 추출 : 추출작업을 할 때마다 데이터의 변경사항만을, 즉 변경하거나 추가된 원본 테이블의 레코드만 추출
OrderID | OrderStatus | LastUpdated |
---|---|---|
1 | BackOrdered | 2020-06-01 12:00:00 |
1 | Shipped | 2020-06-09 12:00:25 |
: 대부분의 경우에는 증분 추출을 사용하는데, 전체 추출과 관련하여 몇 가지 이점이 있기 때문이다.
: 위 책에서 실습하는 내용은 각 프레임워크별 (MySQL, PostgreSQL, REST API 등) 데이터베이스에서 데이터를 추출한 후, S3 버킷에 적재하는 방식이다.
: 전체 추출과 증분 추출의 방식이 다르다.
전체 추출
import pymysql
import csv
import boto3
import configparser
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")
conn = pymysql.connect(host=hostname,
user=username,
password=password,
db=dbname,
port=int(port))
if conn is None:
print("Error connecting to the MySQL database")
else:
print("MySQL connection established!")
m_query = "SELECT * FROM Orders;"
local_filename = "order_extract.csv"
m_cursor = conn.cursor()
m_cursor.execute(m_query)
results = m_cursor.fetchall()
with open(local_filename, 'w') as fp:
csv_w = csv.writer(fp, delimiter='|')
csv_w.writerows(results)
fp.close()
m_cursor.close()
conn.close()
# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
s3_file = local_filename
s3.upload_file(local_filename, bucket_name, s3_file)
parser = configparser.ConfigParser()
parser.read("pipeline.conf") # MySQL 서버와 연결하기 위한 Configuration 설정
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")
⇒ pymysql을 통해 mysql과 연결 과정에서 필요한 정보들을 읽어온다.
m_query = "SELECT * FROM Orders;" #전체 추출 쿼리문
local_filename = "order_extract.csv"
m_cursor = conn.cursor()
m_cursor.execute(m_query)
results = m_cursor.fetchall()
with open(local_filename, 'w') as fp:
csv_w = csv.writer(fp, delimiter='|') # '|' 로 열 구분
csv_w.writerows(results)
⇒ .csv 파일에 읽어온 데이터들을 구분자 (’|’) 들을 통해 열을 구분하고 로드한다.
parser = configparser.ConfigParser() #AWS의 S3 버킷과 연결하기 위해 설정
parser.read("pipeline.conf")
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
s3_file = local_filename
s3.upload_file(local_filename, bucket_name, s3_file)
증분 추출
import pymysql
import csv
import boto3
import configparser
import psycopg2
# get db Redshift connection info
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
dbname = parser.get("aws_creds", "database")
user = parser.get("aws_creds", "username")
password = parser.get("aws_creds", "password")
host = parser.get("aws_creds", "host")
port = parser.get("aws_creds", "port")
# connect to the redshift cluster
rs_conn = psycopg2.connect(
"dbname=" + dbname
+ " user=" + user
+ " password=" + password
+ " host=" + host
+ " port=" + port)
rs_sql = """SELECT COALESCE(MAX(LastUpdated), '1900-01-01')
FROM Orders;"""
rs_cursor = rs_conn.cursor()
rs_cursor.execute(rs_sql)
result = rs_cursor.fetchone()
# there's only one row and column returned
last_updated_warehouse = result[0]
rs_cursor.close()
rs_conn.commit()
# get the MySQL connection info and connect
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")
conn = pymysql.connect(host=hostname,
user=username,
password=password,
db=dbname,
port=int(port))
if conn is None:
print("Error connecting to the MySQL database")
else:
print("MySQL connection established!")
m_query = """SELECT *
FROM Orders
WHERE LastUpdated > %s;"""
local_filename = "order_extract.csv"
m_cursor = conn.cursor()
m_cursor.execute(m_query, (last_updated_warehouse,))
results = m_cursor.fetchall()
with open(local_filename, 'w') as fp:
csv_w = csv.writer(fp, delimiter='|')
csv_w.writerows(results)
fp.close()
m_cursor.close()
conn.close()
# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get(
"aws_boto_credentials",
"access_key")
secret_key = parser.get(
"aws_boto_credentials",
"secret_key")
bucket_name = parser.get(
"aws_boto_credentials",
"bucket_name")
s3 = boto3.client(
's3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
s3_file = local_filename
s3.upload_file(
local_filename,
bucket_name,
s3_file)
rs_sql = """SELECT COALESCE(MAX(LastUpdated), '1900-01-01')
FROM Orders;""" # 데이터 웨어하우스에 가장 마지막으로 데이터가 적재된 시점을 불러온다.
# 이 시점 이후의 변경사항들을 불러와야 하기 때문.
rs_cursor = rs_conn.cursor()
rs_cursor.execute(rs_sql)
result = rs_cursor.fetchone()
#가장 최근 시점만 불러오므로, 한 개의 행만이 반환된다.
last_updated_warehouse = result[0]
rs_cursor.close()
rs_conn.commit()
...
m_query = """SELECT *
FROM Orders
WHERE LastUpdated > %s;"""
m_cursor = conn.cursor()
m_cursor.execute(m_query, (last_updated_warehouse,)) # 최근 시점 이후의 데이터들을 불러온다.
results = m_cursor.fetchall()
⇒ 위의 rs_sql 쿼리문은 데이터 웨어하우스에 데이터가 마지막으로 적재된 시점을 가져온다. 그 후, 그 시점 이후로의 데이터들을 원본 데이터베이스에서 불러와 (mysql 데이터베이스에 변경사항이 있다고 가정) 변경 사항들을 가져온다.
with open(local_filename, 'w') as fp:
csv_w = csv.writer(fp, delimiter='|')
csv_w.writerows(results)
fp.close()
m_cursor.close()
conn.close()
...
s3 = boto3.client(
's3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
s3_file = local_filename
s3.upload_file(
local_filename,
bucket_name,
s3_file)
⇒ 변경 사항들을 .csv로 만든 다음, S3 버킷에 적재한다.
💡 이 과정을 요약하면, 데이터 웨어하우스를 최신화하는 것이 목적이므로, 데이터 웨어하우스의 마지막 변경시점 이후의 모든 변경사항을 추출하여 S3 버킷에 적재하는 것이다.: API의 엔드포인트에서 JSON 형식의 데이터들을 로드한 후, 데이터 웨어하우스에 적재하는 방식이다. (흔한 방식)
import requests
import json
import configparser
import csv
import boto3
lat = 42.36
lon = 71.05
lat_log_params = {"lat": lat, "lon": lon}
api_response = requests.get(
"http://api.open-notify.org/astros.json", params=lat_log_params
)
requests_json = json.loads(api_response.content)
all_passes = []
for response in requests_json['people']:
current_pass = []
current_pass.append(lat)
current_pass.append(lon)
current_pass.append(response['name'])
current_pass.append(response['craft'])
all_passes.append(current_pass)
export_file = "export_file.csv"
with open(export_file, 'w') as fp:
csvw = csv.writer(fp, delimiter = '|')
csvw.writerows(all_passes)
fp.close()
parser = configparser.ConfigParser()
parser.read('../pipeline.conf')
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
s3.upload_file(export_file, bucket_name, export_file)
lat = 42.36
lon = 71.05
lat_log_params = {"lat": lat, "lon": lon} # API GET을 위한 Parameter 설정
api_response = requests.get(
"http://api.open-notify.org/astros.json", params=lat_log_params
)
requests_json = json.loads(api_response.content) #response를 통해 요청한 후, json을 받아온다.
all_passes = [] # .csv의 한 행에 넣을 행 데이터 배열
for response in requests_json['people']: #페이지의 json 파싱
current_pass = []
current_pass.append(lat)
current_pass.append(lon)
current_pass.append(response['name']) # {name : ~~}
current_pass.append(response['craft']) #{craft : ~~}
all_passes.append(current_pass)
export_file = "export_file.csv"
with open(export_file, 'w') as fp:
csvw = csv.writer(fp, delimiter = '|') # 열 구분
csvw.writerows(all_passes)
fp.close()
: Kafka의 실시간 스트리밍 데이터 수집을 이용한 CDC 구조
수집 및 적재 과정