해당 문서는 데이터 파이프라인 핵심 가이드
를 읽고 필요 내용을 정리한 문서이다.
언어는 모두 python으로 진행
샘플 데이터는 아래 데이터로 진행
DROP TABLE IF EXISTS OnlineRetail;
CREATE TABLE OnlineRetail(
voiceNo varchar(10),
stockCode varchar(10),
description varchar(200),
quantity int,
invoiceDate timestamp,
unitprice double,
customerId int,
country varchar(20)
);
아래와 같이 script로 밀어넣음
import pymysql
import pandas as pd
data = pd.read_excel('./Online Retail.xlsx')
host = "localhost"
port = "3306"
username = "root"
passwd = "password"
db_name = 'datapipeline'
db = pymysql.connect(user=username, passwd=passwd, host=host, db=db_name, charset='utf8')
cur = db.cursor(pymysql.cursors.DictCursor)
data_2 = data.where(data.notnull(), None)
data_value = data_2.values.tolist()
insert_sql = "INSERT INTO `OnlineRetail` VALUES (%s, %s, %s, %s, %s, %s, %s, %s);"
cur.executemany(insert_sql, data_value)
db.commit()
cur.close()
db.close()
구현하기 간단하지만, 대규모 데이터셋에서는 확장성이 떨어진다.
전체 추출과 증분 추출 사이에도 트레이드 오프가 있음
이 부분은 말그대로 테이블 전체를 가져오는 것으로, 대용량 테이블의 경우 실행하는데 오랜 시간이 걸릴 수 있음
목적지에 있는 대상 테이블을 먼저 삭제하고 새로 추출된 데이터를 대상 데이터에 로드
select *
from OnlineRetail
추출 작업의 마지막 실행 이후 변경되거나 추가된 원본 테이블의 레코드만 추출
목적지에 있는 대상 테이블 데이터에 추가한다. 추가함으로써 기존 데이터 뿐만 아니라 업데이트된 기록도 모두 가지게 된다.
select *
from OnlineRetail
where invoiceDate >= {{ last_update_date }}
import csv
import pymysql
last_update_date = '2011-11-01'
query = """
SELECT *
FROM OnlineRetail
WHERE invoiceDate >= %s;
"""
output_file_name = 'online_reatail_{}.csv'.format(
last_update_date.replace('-', '')
)
# mysql connect
host = "localhost"
port = "3306"
username = "root"
passwd = "password"
db_name = 'datapipeline'
db = pymysql.connect(user=username, passwd=passwd, port=int(port), host=host, db=db_name, charset='utf8')
if conn is None:
raise Error("Connection Error")
cur = db.cursor()
cur.execute(query, (last_update_date, ))
result = cur.fetchall()
with open(output_file_name, 'w') as f:
csv_w = csv.writer(f, delimiter='|')
csv_w.writeroows(result)
cur.close()
conn.close()
구현이 복잡하지만, 원본 테이블의 변경되는 데이터 볼륨이 크거나 MySQL 소스에서 데이터를 더 자주 수집해야 하는 경우에 적합
이진 로그의 경우 CDC의 한 형태로, 데이터 베이스에서 수행된 모든 작업에 대한 기록을 보관하는 로그이다.
원래 이진 로그의 목적은 MySQL 인스턴스로 데이터를 복제하기 위한 것(레플리카 DB)이지만, 이 것을 데이터 웨어하우스로 수집하는데 사용 가능하다.
복제 순서로는 아래와 같다.(e.g. mysql)
이진 로그 구성을 변경하기 전, 항상 데이터베이스 소유자와 논의하고 진행
SELECT variable_value
FROM performance_schema.global_variables
WHERE variable_name='log_bin';
+----------------+
| variable_value |
+----------------+
| ON |
+----------------+
MySQL 최신 버전에서는 아래와 같이 구분
SELECT variable_value
FROM performance_schema.global_variables
WHERE variable_name='binlog_format';
+----------------+
| variable_value |
+----------------+
| ROW |
+----------------+
이진 로그 형식의 변화의 경우, my.cnf 파일에서 설정된다.
CREATE TABLE IF NOT EXISTS TestBinLogStream(
id int(10) NOT NULL AUTO_INCREMENT PRIMARY KEY,
status varchar(30),
createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP
);
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=1;
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=2;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=1;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=2;
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=3;
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=4;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=3;
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=5;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=4;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=5;
DELETE FROM TestBinLogStream WHERE id = 3;
pip install mysql-replication
import pymysqlreplication
from pymysqlreplication import BinLogStreamReader, row_event
def event_action_parsing(event):
if isinstance(event, row_event.DeleteRowsEvent):
return "delete"
elif isinstance(event, row_event.UpdateRowsEvent):
return "update"
elif isinstance(event, row_event.WriteRowsEvent):
return "insert"
else:
raise ValueError(f"Unknown type tracked : {type(event)}")
mysql_settings = {
"host" : host,
"port" : int(port),
"user": username,
"passwd": passwd
}
bin_log_stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=0,
only_tables=('TestBinLogStream'), # target table
only_events=(
row_event.DeleteRowsEvent,
row_event.UpdateRowsEvent,
row_event.WriteRowsEvent
)
)
database_event = []
for event in bin_log_stream:
action_type = event_action_parsing(event)
for row in event.rows:
temp_row = {}
temp_row['action'] = action_type
if action_type == "update":
temp_row.update(row["after_values"].items())
else:
temp_row.update(row["values"].items())
database_event.append(temp_row)
bin_log_stream.close()
[
{
"action": "insert",
"id": 1,
"status": "submitted",
"createdAt": "2022-10-06 19:15:21",
"updated_at": "2022-10-06 19:15:21"
},
...
{
"action": "update",
"id": 5,
"status": "completed",
"createdAt": "2022-10-06 19:22:32",
"updated_at": "2022-10-06 19:24:25"
},
{
"action": "delete",
"id": 3,
"status": "completed",
"createdAt": "2022-10-06 19:20:45",
"updated_at": "2022-10-06 19:24:23"
}
]
Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.
공식문서
깨달음을 얻고 갑니다