
모든 데이터를 한곳으로 모으는 것이 데이터팀의 시작
그런 데이터가 모이는 데이터 분석용 전용 데이터베이스를 데이터웨어하우스라 함
데이터 웨어하우스에 데이터를 적재하는 프로세스를 데이터파이프라인, ETL이라고 하고, 이걸 Airflow에선 Dag라고 함
외부의 데이터를 데이터 시스템안 데이터웨어하우스로 가져오는 코드를 작성하고 관리하는 방법에 대해 알아볼 것
그렇게 사용되는 가장 인기있는 framework이 airflow.
airflow 이용해서 etl 데이터 파이프라인을 작성하고 관리하는 것
데이터 웨어하우스, etl 데이터 파이프라인을 총칭해서 데이터 인프라, 데이터 플랫폼이라고 함.
데이터 조직이 발전하면 여기에 빅데이터 처리 프레임웤인 스파크,하둡이 같은게 들어가고
리얼타임 처리하는게 필요하면 Kafka, SparkStream,
머신러닝이 발전하면 NoSql, 카산드라 등등해서 굉장히 많은 다른 기술들이 데이터 인프라의 일부가 됨.
가장 기본이 되는 것 데이터 웨어하우스와 etl 프로세스이다.
기반이 만들어지면 데이터분석가가 들어와서 이 정보를 활용해서 회사의 중요한 지표를 데이터기반으로 계산해서 대시보드 툴 위에서 시각화. 그 시각화에 사용되는게 태블로 루커 수퍼셋 등
데이터 기반으로 회사가 어느방향으로 움직일지 정의가 되면 데이터과학자가 들어와서 데이터 엔지니어가 모아놓은 데이터 기반으로 알고리즘 만들어서 사용자에게 좀더 개인화된 형태로 추천을 제공하거나 검색기능을 제공하고 또 운영이 중요하면 자동화를 통해서 운영비용을 줄이는 형태로 데이터 팀이 기여. 데이터엔지니어가 만들어놓은 인프라를 기반으로 두가지 형태로 데이터팀이 회사의 발전에 기여
데이터팀이 만들어놓은 인프라를 기반으로
1. 데이터 기반 과학적 의사결정
2. 데이터를 바탕으로 한 프로덕트 개선(사용자 경험 향상, 서비스 운영비용 절감)
데이터 소스 -> 다수의 ETL (by Airflow) -> 데이터 웨어하우스 -> 대시보드
데이터 소스: MySQL(프로덕션 DB), Stripe(신용카드 데이터), Mailchimp(이메일), Zendesk (서포트 티켓), Amplitude (유저 데이터), RingCentral(콜 데이터), Salesforce(세일즈 데이터)
더 정제되고 목적이 분명한 데이터를 만드는게 요약 테이블 만들기인데 이건 ELT, 보통 데이터 분석가가 수행.

데이터 시스템이 성숙해진 회사들이 갖는 데이터시스템의 다이어그램. 많은 데이터 소스가 있을거고 있는 그대로 데이터웨어하우스에 저장하기엔 무리. 그런 데이터를 일단 데이터레이크에 저장하고 이걸 또 프로세싱해서 의미있는 것만 데이터웨어하우스에 로딩. 데이터레이크는 aws s3와 같은 스토리지. 이건 그냥 스토리지지 프로세싱하는 프로세싱 프레임워크가 연계되어있는게 아님. 보통 spark나 athena를 사용해서 데이터 레이크에 있는 데이터를 프로세싱해서 정제해서 데이터웨어하우스에 저장.
ELT는 뭐냐 ? 데이터레이크에서 정제해서 데이터 웨어하우스에 넣는 것도 ELT로 볼 수 있다.
Raw Data ETL Jobs
1. 외부(ex.페이스북 광고정보)와 내부 데이터 소스에서 데이터를 읽어다가 (많은 경우 API를 통하게 됨)
2. 적당한 데이터 포맷 변환 후 (데이터 크기가 커지면 일반 파이썬 코드로는 불가. Spark등이 필요해짐)
3. 데이터 웨어하우스 로드
이 작업은 보통 데이터 엔지니어가 함
Summary/Report Jobs
1. DW(혹은 DL)로부터 데이터를 읽어 다시 DW에 쓰는 ETL
2. Raw Data를 읽어서 일종의 리포트 형태나 써머리 형태의 테이블을 다시 만드는 용도
3. 특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재
요약 테이블의 경우 SQL (CTAS를 통해)만으로 만들고 이는 데이터 분석가가 하는 것이 맞음. 데이터 엔지니어 관점에서는 어떻게 데이터 분석가들이 편하게 할 수 있는 환경을 만들어 주느냐가 관건
-> Anayltics Engineer (DBT)
Production Data Jobs
데이터 파이프라인 작성시 best practices들과 기억해두면 좋은 팁
실패를 당연히 여기고, 실패했을 때 그 사고를 살펴보는 것. 왜 이 사고가 일어났는지 이해하고 재발 방지를 위함
Extract:
Transform:
Load:
-- 모듈과 충돌로 인해 버전 맞추기
!pip install ipython-sql==0.4.1
!pip install SQLAlchemy==1.4.49
-- load
%load_ext sql
-- 코랩에서 redshift 접속
%sql postgresql://ID:PW@HOST_URL
-- 테이블 생성
%%sql
DROP TABLE IF EXISTS my_schema.name_gender;
CREATE TABLE my_schema.name_gender (
name varchar(32) primary key,
gender varchar(8)
);
--테이블 생성 확인
%%sql
SELECT *
FROM kyongjin1234.name_gender;

import psycopg2
# Redshift connection 함수
def get_Redshift_connection():
host = "HOST_URL"
redshift_user = "ID"
redshift_pass = "PW"
port = port_num
dbname = "db_name"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
import requests
def extract(url):
f = requests.get(url)
return (f.text)
def transform(text):
lines = text.strip().split("\n")
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
return records
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO kyongjin1234.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
데이터 확인

lines = transform(data)
len(lines) # 101
lines[0:10]
''' 결과
[['name', 'gender'],
['Adaleigh', 'F'],
['Amryn', 'Unisex'],
['Apurva', 'Unisex'],
['Aryion', 'M'],
['Alixia', 'F'],
['Alyssarose', 'F'],
['Arvell', 'M'],
['Aibel', 'M'],
['Atiyyah', 'F']]
'''
load(lines)
%%sql
SELECT COUNT(1)
FROM kyongjin1234.name_gender;

%%sql
SELECT *
FROM kyongjin1234.name_gender;

%%sql
SELECT gender, COUNT(1) count
FROM kyongjin1234.name_gender
GROUP BY gender;

하지만 위의 코드의 경우 문제가
1. 컬럼명(gender, name)이 레코드로 인식되어 첫번째 레코드로 추가됨
2. 데이터 추가시 테이블의 데이터를 삭제해주고 추가하지 않아 멱등성이 보장되지 않음
의 문제가 있다.
1번의 문제의 경우 transform함수를 수정하여 해결할 수 있다.
def transform(text):
lines = text.strip().split("\n")
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
if name == 'name' and gender == 'gender':
continue
records.append([name, gender])
return records
name이 'name'이고 gender가 'gender'일 경우 continue하여 다음 루프로 넘어가게끔하여 수정하였다.
2번 문제의 경우 load 함수를 수정하여 해결할 수 있다.
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute("DELETE FROM kyongjin1234.name_gender;")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO kyongjin1234.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
cur.execute("END;")
except Exception as e:
cur.execute("ROLLBACK;")
print("에러 발생:", e)
BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주었으며 DELETE FROM을 먼저 수행하여 FULL REFRESH을 하는 형태로 코드를 수정하였다. 만약 코드 실행 시 에러가 발생할 경우 ROLLBACK을 실행한다.