🈯 숙제 해설
3주차 복습
start_date execution_date
Full Refresh의 경우 INSERT전에 항상 DELETE로 기존 테이블의 내용을 지워주고 INSERT 실행Incremental Update의 경우 타임스탬프와 같은 필드 생성 후 execution_date 활용BEGIN;(queries);END; 와 같은 방법으로 구현.ROLLBACK이라는 sql 실행하면 임시상태의 모든 쿼리 삭제.AUTOCOMMIT을 True, False로 할 지는 팀 내에서 정해야함.BEGIN;END;를 쓸지, Python에서 try/except를 쓸지는 선호 차이.🧐 4주차
Airflow DAG
default_args 예시
default_args = {
'owner': 'ownerid',
'retries': 0,
'retry_delay': timedelta(seconds=20),
'depends_on_past': False
}
dag = DAG(
"dag_v1", # DAG name
'start_date': datetime(2020,8,7,hour=0,minute=00),
# schedule (same as cronjob)
schedule_interval="0 * * * *",
# common settings
default_args=default_args
)
task_start = DummyOperator(
task_id='start',
dag=dag_runAirflow_v1
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
max_active_runs : 동시 실행 가능한 DAG 수(default=16)max_active_tasks : 동시 실행 가능한 Task 수(default=16)catchup : 밀린 거 자동으로 채울건지default_args로 지정되는 건 Task parameters로 Task레벨로 적용되는 것!🧐 4주차
Airflow DAG 개선
from airflow import DAG
from airflow.operators.python import PythonOperator
def func(**context) :
link = context["params"]["url"]
...
dag_name = DAG(...)
task = PythonOperator(
task_id = 'func',
python_callable = func,
params = {
"url" : "https://..."
},
dag = dag_name
)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
def func1(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
...
return something
def func2(**context):
text = context["task_instance"].xcom_pull(key="return_value", task_ids="func1")
...
dag_name = DAG(...)
task1 = PythonOperator(
task_id = 'func1',
python_callable = func1,
params = {
'url': Variable.get("csv_url")
},
dag = dag_name
)
task2 = PythonOperator(
task_id = 'func2',
python_callable = func2,
params = {
},
dag = dag_name
)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
def connection(autocommit=False):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
def func1(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
...
return something
def func2(**context):
text = context["task_instance"].xcom_pull(key="return_value", task_ids="func1")
...
dag_name = DAG(...)
task1 = PythonOperator(
task_id = 'func1',
python_callable = func1,
params = {
'url': Variable.get("csv_url")
},
dag = dag_name
)
task2 = PythonOperator(
task_id = 'func2',
python_callable = func2,
params = {
},
dag = dag_name
)
Variables와 ConnectionsVariables : 자주 사용되는 configuration info들을 미리 저장해 두는 것. Connections : 외부 서비스와 연결하기 위한 계정 정보들을 미리 저장해 두는 것. ROW_NUMBER()를 이용해서 primary key로 partition 나누고 ts로 order by 해서 하나의 레코드만 남기기