🈯 숙제 해설
3주차 복습
start_date
execution_date
Full Refresh
의 경우 INSERT
전에 항상 DELETE
로 기존 테이블의 내용을 지워주고 INSERT
실행Incremental Update
의 경우 타임스탬프와 같은 필드 생성 후 execution_date
활용BEGIN;(queries);END;
와 같은 방법으로 구현.ROLLBACK
이라는 sql 실행하면 임시상태의 모든 쿼리 삭제.AUTOCOMMIT
을 True, False로 할 지는 팀 내에서 정해야함.BEGIN;END;
를 쓸지, Python에서 try/except
를 쓸지는 선호 차이.🧐 4주차
Airflow DAG
default_args 예시
default_args = {
'owner': 'ownerid',
'retries': 0,
'retry_delay': timedelta(seconds=20),
'depends_on_past': False
}
dag = DAG(
"dag_v1", # DAG name
'start_date': datetime(2020,8,7,hour=0,minute=00),
# schedule (same as cronjob)
schedule_interval="0 * * * *",
# common settings
default_args=default_args
)
task_start = DummyOperator(
task_id='start',
dag=dag_runAirflow_v1
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
max_active_runs
: 동시 실행 가능한 DAG 수(default=16)max_active_tasks
: 동시 실행 가능한 Task 수(default=16)catchup
: 밀린 거 자동으로 채울건지default_args
로 지정되는 건 Task parameters로 Task레벨로 적용되는 것!🧐 4주차
Airflow DAG 개선
from airflow import DAG
from airflow.operators.python import PythonOperator
def func(**context) :
link = context["params"]["url"]
...
dag_name = DAG(...)
task = PythonOperator(
task_id = 'func',
python_callable = func,
params = {
"url" : "https://..."
},
dag = dag_name
)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
def func1(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
...
return something
def func2(**context):
text = context["task_instance"].xcom_pull(key="return_value", task_ids="func1")
...
dag_name = DAG(...)
task1 = PythonOperator(
task_id = 'func1',
python_callable = func1,
params = {
'url': Variable.get("csv_url")
},
dag = dag_name
)
task2 = PythonOperator(
task_id = 'func2',
python_callable = func2,
params = {
},
dag = dag_name
)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
def connection(autocommit=False):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
def func1(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
...
return something
def func2(**context):
text = context["task_instance"].xcom_pull(key="return_value", task_ids="func1")
...
dag_name = DAG(...)
task1 = PythonOperator(
task_id = 'func1',
python_callable = func1,
params = {
'url': Variable.get("csv_url")
},
dag = dag_name
)
task2 = PythonOperator(
task_id = 'func2',
python_callable = func2,
params = {
},
dag = dag_name
)
Variables
와 Connections
Variables
: 자주 사용되는 configuration info들을 미리 저장해 두는 것. Connections
: 외부 서비스와 연결하기 위한 계정 정보들을 미리 저장해 두는 것. ROW_NUMBER()
를 이용해서 primary key로 partition 나누고 ts로 order by 해서 하나의 레코드만 남기기