데이터 추출

Log·2022년 10월 6일
0

문서 목적

해당 문서는 데이터 파이프라인 핵심 가이드를 읽고 필요 내용을 정리한 문서이다.

데이터 추출

언어는 모두 python으로 진행
샘플 데이터는 아래 데이터로 진행

MySQL

table create

Create table

DROP TABLE IF EXISTS OnlineRetail;
CREATE TABLE OnlineRetail(
	voiceNo varchar(10),
	stockCode varchar(10),
	description varchar(200),
	quantity int,
	invoiceDate timestamp,
	unitprice double,
	customerId int,
	country varchar(20)
);

Generate data

아래와 같이 script로 밀어넣음

import pymysql
import pandas as pd

data = pd.read_excel('./Online Retail.xlsx')

host = "localhost"
port = "3306"
username = "root"
passwd = "password"
db_name = 'datapipeline'

db = pymysql.connect(user=username, passwd=passwd, host=host, db=db_name, charset='utf8')

cur = db.cursor(pymysql.cursors.DictCursor)
data_2 = data.where(data.notnull(), None)
data_value = data_2.values.tolist()

insert_sql = "INSERT INTO `OnlineRetail` VALUES (%s, %s, %s, %s, %s, %s, %s, %s);"
cur.executemany(insert_sql, data_value)
db.commit()

cur.close()
db.close()

SQL 사용 추출

구현하기 간단하지만, 대규모 데이터셋에서는 확장성이 떨어진다.
전체 추출과 증분 추출 사이에도 트레이드 오프가 있음

전체 추출

이 부분은 말그대로 테이블 전체를 가져오는 것으로, 대용량 테이블의 경우 실행하는데 오랜 시간이 걸릴 수 있음
목적지에 있는 대상 테이블을 먼저 삭제하고 새로 추출된 데이터를 대상 데이터에 로드

e.g
select *
from OnlineRetail

증분 추출

추출 작업의 마지막 실행 이후 변경되거나 추가된 원본 테이블의 레코드만 추출
목적지에 있는 대상 테이블 데이터에 추가한다. 추가함으로써 기존 데이터 뿐만 아니라 업데이트된 기록도 모두 가지게 된다.

e.g.
select *
from OnlineRetail
where invoiceDate >= {{ last_update_date }}

한계점

  1. 삭제된 행은 캡처되지 않음
    원본 테이블에서 행이 삭제되면 알 수 없으며 해당 레코드는 대상 테이블에서 아무것도 변경되지 않은 것처럼 남아 있음
  2. 원본 테이블에는 마지막 업데이트된 시간에 대한 신뢰할 수 있는 타임스탬프가 있어야 한다.
    소스 시스템 테이블에서 해당 컬럼이 없거다 업데이트되지 않는다면 소싱 불가능하다.

example code

import csv
import pymysql

last_update_date = '2011-11-01'
query = """
SELECT *
FROM OnlineRetail
WHERE  invoiceDate >= %s;
"""
output_file_name = 'online_reatail_{}.csv'.format(
	last_update_date.replace('-', '')
)

# mysql connect
host = "localhost"
port = "3306"
username = "root"
passwd = "password"
db_name = 'datapipeline'

db = pymysql.connect(user=username, passwd=passwd, port=int(port), host=host, db=db_name, charset='utf8')

if conn is None:
	raise Error("Connection Error")

cur = db.cursor()
cur.execute(query, (last_update_date, ))

result = cur.fetchall()

with open(output_file_name, 'w') as f:
	csv_w = csv.writer(f, delimiter='|')
    csv_w.writeroows(result)

cur.close()
conn.close()

이진 로그 복제

구현이 복잡하지만, 원본 테이블의 변경되는 데이터 볼륨이 크거나 MySQL 소스에서 데이터를 더 자주 수집해야 하는 경우에 적합
이진 로그의 경우 CDC의 한 형태로, 데이터 베이스에서 수행된 모든 작업에 대한 기록을 보관하는 로그이다.
원래 이진 로그의 목적은 MySQL 인스턴스로 데이터를 복제하기 위한 것(레플리카 DB)이지만, 이 것을 데이터 웨어하우스로 수집하는데 사용 가능하다.
복제 순서로는 아래와 같다.(e.g. mysql)

  1. 이진 로그 활성화 및 구성
  2. 초기 전체 테이블 추출 실행 및 로드
  3. 지속적 이진 로그 추출
  4. 추출된 이진 로그를 데이터 웨어하우스로 변환 로드

이진 로그 구성을 변경하기 전, 항상 데이터베이스 소유자와 논의하고 진행


이진 로깅 활성화 여부 조회

SELECT variable_value
FROM performance_schema.global_variables
WHERE variable_name='log_bin';
+----------------+
| variable_value |
+----------------+
| ON             |
+----------------+

이진 로깅 형식 확인

MySQL 최신 버전에서는 아래와 같이 구분

  • STATEMENT
    • 이진 로그에 행을 삽입하거나 수정하는 행동들에 대해 SQL문 자체 기록
    • 다른 MySQL 데이터 베이스로 복제하는 경우, 이게 유용하지만 데이터 웨어하우스와 호환되지 않을 수 있음
  • ROW
    • 테이블의 행에 대한 모든 변경 사항이 행 자체의 데이터로 이진 로그 행에 표시
  • MIXED
    • STATEMENT + ROW
확인 방법
SELECT variable_value
FROM performance_schema.global_variables
WHERE variable_name='binlog_format';
+----------------+
| variable_value |
+----------------+
| ROW            |
+----------------+

이진 로그 형식의 변화의 경우, my.cnf 파일에서 설정된다.

test를 위한 table 생성

CREATE TABLE IF NOT EXISTS TestBinLogStream(
	id int(10) NOT NULL AUTO_INCREMENT PRIMARY KEY,
	status varchar(30),
	createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
	updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 
                     ON UPDATE CURRENT_TIMESTAMP
);

INSERT INTO TestBinLogStream (status) VALUES ("submitted");
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=1;
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=2;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=1;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=2;
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=3;
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=4;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=3;
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=5;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=4;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=5;
DELETE FROM TestBinLogStream WHERE id = 3;

python

library
pip install mysql-replication
code
import pymysqlreplication
from pymysqlreplication import BinLogStreamReader, row_event


def event_action_parsing(event):
    if isinstance(event, row_event.DeleteRowsEvent):
        return "delete"
    elif isinstance(event, row_event.UpdateRowsEvent):
        return "update"
    elif isinstance(event, row_event.WriteRowsEvent):
        return "insert"
    else:
        raise ValueError(f"Unknown type tracked : {type(event)}")
        
mysql_settings = {
        "host" : host,
        "port" : int(port),
        "user": username,
        "passwd": passwd
    }
    
    
bin_log_stream = BinLogStreamReader(    
    connection_settings=mysql_settings,
    server_id=0,
    only_tables=('TestBinLogStream'), # target table
    only_events=(
            row_event.DeleteRowsEvent,
            row_event.UpdateRowsEvent,
            row_event.WriteRowsEvent
        )    
)

database_event = []

for event in bin_log_stream:
    action_type = event_action_parsing(event)
    for row in event.rows:
        temp_row = {}
        temp_row['action'] = action_type
        if action_type == "update":
            temp_row.update(row["after_values"].items())
        else:
            temp_row.update(row["values"].items())
        database_event.append(temp_row)

bin_log_stream.close()
result
[
    {
        "action": "insert",
        "id": 1,
        "status": "submitted",
        "createdAt": "2022-10-06 19:15:21",
        "updated_at": "2022-10-06 19:15:21"
    }, 
    ...
    {
        "action": "update",
        "id": 5,
        "status": "completed",
        "createdAt": "2022-10-06 19:22:32",
        "updated_at": "2022-10-06 19:24:25"
    },
    {
        "action": "delete",
        "id": 3,
        "status": "completed",
        "createdAt": "2022-10-06 19:20:45",
        "updated_at": "2022-10-06 19:24:23"
    }
]


Kafka 및 Debezium을 이용한 스트리밍 데이터 수집

Debezium

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.
공식문서

  • CDC 시스템에서 행 수준 변경을 캡처한 후 다른 시스템에서 사용할 수 있는 이벤트로 스트리밍 해주는 시스템
  • 아래와 같이 세가지 주요 구성 요소가 있음
    • zookeeper : 분산 환경을 관리, 각 서비스의 구성을 처리
    • kafka : 확장성이 뛰어난 데이터 파이프라인을 구축하는 데 일반적으로 사용되는 분산 스트리밍 플랫폼
    • kafka conect : 데이터를 카프카를 통해 쉽게 스트리밍할 수 있도록 카프카를 다른 시스템과 연결하는 도구. CDC 시스템의 데이터를 카프카 토픽으로 변환

Flow example

profile
열심히 정리하는 습관 기르기..

1개의 댓글

comment-user-thumbnail
2022년 10월 6일

깨달음을 얻고 갑니다

답글 달기