🈯 숙제 해설
2주차 복습
# 모범답안 : Subquery + 자기참조
WITH tmp AS (
SELECT userid
, channel
, (ROW_NUMBER()
OVER
(PARTITION BY userid ORDER BY ts asc)) AS arn
, (ROW_NUMBER()
OVER
(PARTITION BY userid ORDER BY ts desc)) AS drn
FROM 테이블1
JOIN 테이블2
ON 테이블1.컬럼 = 테이블2.컬럼
)
SELECT userid, tmp1.channel AS first, tmp2.channel AS last
FROM tmp tmp1
JOIN tmp tmp2 ON tmp1.userid = tmp2.userid
WHERE tmp1.arn = 1 and tmp2.drn = 1
ORDER BY 1;
# 내가 푼 방식 : WINDOW
SELECT
DISTINCT u.userid,
FIRST_VALUE(u.channel)
OVER (
PARTITION BY u.userid
ORDER BY s.ts
ROWS BETWEEN
UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING # userid로 파티션되었기때문에 UNBOUNDED로 전체 다 봐줘야함
) as first_channel,
LAST_VALUE(u.channel)
OVER (
PARTITION BY u.userid
ORDER BY s.ts
ROWS BETWEEN
UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING
) as last_channel
FROM raw_data.user_session_channel u
JOIN raw_data.session_timestamp s
ON u.sessionid = s.sessionid
ORDER BY 1;
# SUM과 COUNT는 NULL을 0 취급함.
SELECT userID
, SUM(amount) gross_revenue
, SUM(CASE WHEN 테이블2.refunded is not TRUE THEN amount END) net_revenue
FROM 테이블1
JOIN 테이블2 ON 테이블1.컬럼 = 테이블2.컬럼
GROUP BY 1
ORDER BY 2 DESC;
🧐 3주차
데이터 파이프라인과 ETL
비구조화된 데이터 처리하기
대용량 데이터를 빠르게 머신러닝 모델과 계산해서 반환해야 하는 경우(넷플릭스처럼)
# 테이블 생성
DROP TABLE IF EXISTS 본인db.테이블명;
CREATE TABLE 본인db.테이블명 (
name varchar(32),
gender varchar(8)
);
# Postgre랑 연결
import psycopg2
def get_Redshift_connection():
host = "호스트"
redshift_user = "아이디"
redshift_pass = "비밀번호"
port = 5439
dbname = "db이름"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
# 함수정의
import requests
def extract(url):
f = requests.get(url)
return (f.text)
def transform(text):
lines = text.split("\n")
return lines
def load(lines):
cur = get_Redshift_connection()
for r in lines:
if r != '':
(name, gender) = r.split(",")
sql = "INSERT INTO db.테이블명 VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
# 실행
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
data = extract(link)
lines = transform(data)
load(lines)
# 결과
SELECT *
FROM db.테이블;
# name gender
# name gender
# Adaleigh F
# Amryn Unisex
# Apurva Unisex
# Aryion M
# Alixia F
# Alyssarose F
# Arvell M
🧐 3주차
Airflow
Cloud Composer
/ AWS에는 MWAA
가 있음.from datetime import datetime, timedelta
from airflow import DAG
default_args = {
'owner': '누가 실행하는지',
'start_date': '언제 데이터를 넣을것인지',
'end_date': '언제까지 넣을것인지',
'email': ['에러가 나면 누구한테 이메일 보낼 것인지'],
'retries': '몇 번 다시 시도할 것인지',
'retry_delay': timedelta(minutes='몇 분 뒤 다시 시도할 것인지'),
}
test_dag = DAG(
"DAG 이름",
schedule_interval = "0 2 * * *",
default_args = default_args
)
task1 >> task 2
task 1 >> [task2, task3] >> task 4
Full Refresh
작업이 좋음.Incremental update
(바뀐것만 읽기)만 가능하다면 create, modified, deleted
이 세 가지 필드가 존재해야 함. Backfill
이 쉬워야 함(Incremental update의 경우에만). 이 때, catchup 파라미터가 True가 되어야 하고 start_date와 end_date가 적절하게 설정되어야 함. Backfill
❗❗start_date
로 지정한 시점에 시작이 아니라, 그 다음 날부터 시작start_date=2022-10-01
이면 실제 시행 날짜는 2022-10-02
. 1일부터 2일까지의 데이터가 쌓이는 시간이 필요하니까. start_date
과 execution_date
는 항상 차이가 있음.