📚 오늘 공부한 내용
1. DAG 실행하는 방법
schedule
로 지정해 주기적으로 실행할 수 있다.
- 다른 DAG에 의해 트리거로 실행되도록 처리해 준다. (이 방법이 더 좋음)
- Explicit Trigger
- DAG A가 분명하게 다음 DAG인 DAG B를 알고 있는 트리거
- TriggerDagOperator
- Jinja TEMPLATE을 사용해야 한다.
- A->B
- Reactive Trigger
- DAG B가 DAG A가 끝나기를 대기하고 있다 DAG A의 태스크가 끝나면 실행
- ExternalTaskSensor
- DAG A가 이 사실을 알 수 없다.
- 의존 관계가 DAG A에게 보이지 않기 때문에 발생할 수 있는 이슈가 있다.
- B->A
- BranchPythonOperator는 상황에 맞춰 뒤에 어떤 태스크가 실행될지를 동적으로 설정한다. 조건에 따라 다른 태스크로 분기한다.
- LatestOnlyOperator는 실행되는 시점을 따지고 과거 데이터를 backfill 하기 위해 실행되는 거라 판단되면 실행이 중단되는 Operator이다. 불필요한 태스크를 처리한다.
- Trigger Rule 보통은 앞단 태스크가 실행이 되어야 뒤의 태스크가 실행되게 되는데 어떤 경우는 앞단 태스크가 실패하더라도 뒤 태스크는 꼭 실행되어야 하는 경우와 앞단 중에 하나만 성공해도 뒤 태스크가 실행되어야 하는 경우에 대한 처리가 있다.
2. TriggerDagOperator
- DAG A의 태스크를 TriggerDagRunOperator로 구현한다.
trigger_dag_id
뒤에 트리거 하려는 즉, 다음 DAG의 이름을 기재한다.
conf
는 DAG B에 넘기고 싶은 정보를 주는 것으로 받은 쪽 DAG에서는 Jinja Template을 사용한다면 dag_run.conf["path"])
로도 접근 가능하다. 만약 PythonOperator로 사용한다면 kwargs['dag_run'].conf.get('path')
로 작성해 준다.
- execution_date는
- reset_dag_run을 True로 해 두면 해당 실행 일자에 DAG를 실행한 정보가 있더라도 실행하라는 뜻이다. 이미 있는 경우 값은 오버라이딩 된다.
- wait_for_completion은 트리거 대상의 DAG가 끝날 때까지 기다리라는 뜻이다. DAG가 끝나면 해당 Operator가 끝나게 된다.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_B = TriggerDagRunOperator(
task_id="trigger_B",
trigger_dag_id="트리거하려는DAG이름",
conf = {'path':'/opt/ml/conf'},
execution_date = "{{ds}}",
reset_dag_run = True,
wait_for_completion = True
)
3. Jinja Template
1) Jinja Template이란?
- Jinja Template은 Python에서 널리 사용되는 템플릿 엔진이다.
- flask에서 많이 쓰인다.
- 하나의 언어 안에서 로직 및 변수를 가지고 다양한 케이스를 처리할 수 있다.
- 변수는
{{}}
이중 중괄호를 감싸서 사용한다.
<h1>안녕하세요, {{ name }}님!</h1>
<ul>
{% for item in items %}
<li>{{ item }}</li>
{% endfor %}
</ul>
2) Airflow에서 Jinja Template
- Airflow에서 Jinja Template을 사용하면 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능하다.
- 코드의 재사용이 늘어난다.
- execution_date와 같은 태스크 실행할 때 주어지는 파라미터를 Jinja Template에서 쉽게 사용할 수 있다.
- 또한 variable, connection 등에서 사용할 수 있다.
- 모든 경우에 쓸 수 있는 게 아니라
BashOperator
에서만 사용할 수 있다.
task1 = BashOperator(
task_id='task1',
bash_command='echo "{{ ds }}"',
dag=dag
)
task2 = BashOperator(
task_id='task2',
bash_command='echo "안녕하세요, {{ params.name }}!"',
params={'name': 'John'},
dag=dag
)
3) Airflow에서 사용 가능한 Jinja 변수들
- {{ ds }}
- {{ ds_nodash }}
- {{ ts }}
- {{ dag }}
- {{ task }}
- {{ dag_run }}
- {{ var.value }}: {{ var.value.get('my.var', 'fallback') }}
- {{ var.json }}: {{ var.json.my_dict_var.key1 }}
- {{ conn }}: {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }}
4. Sensor
1) Sensor란?
- Sensor는 특정 조건이 충족될 때까지 대기하는 Operator이다.
- 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용하다.
- Airflow에서 제공하는 내장 Sensor
- FileSensor
- HttpSensor
- SqlSensor
- TimeSensor
- ExternalTaskSensor
- poke 모드는 체크 주기를 명확하게 보장할 수 있다. worker 하나를 잡고 주기적으로 check 하는 게 poke 모드이고, default 모드이다. reschedule 모드도 존재한다.
2) External Task Sensor
- DAG B(뒤에 실행되어야 할 태스크)가 DAG A의 특정 태스크가 끝났는지를 체크한다.
- 동일한 schedule_interval을 사용해야 한다.
- 웬만해서는 사용하지 않는 것이 좋다.
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG이름',
external_task_id='end',
timeout=5*60,
mode='reschedule'
)
5. Trigger Rule
- 윗단 태스크의 성공 실패 상황에 따라 뒷단 태스크의 실행 여부를 결정하고 싶을 때 사용한다.
- Operator에 trigger_rule이란 파라미터로 결정한다.
- 설정 값
- ALL_SUCCESS (default)
- ALL_FAILED
- ALL_DONE
- ONE_FAILED
- ONE_SUCCESS
- NONE_FAILED
- NONE_FAILED_MIN_ONE_SUCCESS
6. 태스크 그룹핑
- 태스크의 수가 많은 DAG라면 태스크들을 성격에 따라 관리할 필요성이 있다.
- 이전에는 SubDAG가 사용되다 Task Grouping으로 넘어가는 추세이다.
- TaskGroup 안에 TaskGroup도 가능하다. (nesting 가능)
- 태스크처럼 실행 순서 역시 정의 가능하다.
- 한 군데 묶여 보이기 때문에 관리가 쉽다는 장점이 있다.
7. Dynamic Dag
- 템플릿 형태로 DAG를 찍어내야 하는 경우가 생기는데 이를 매번 개발자가 해 주는 것이 아니라 이걸 코드를 통해서 해 준다면 더 효율적이고 생산성이 높다.
- DAG 코드를 개발자가 아닌 코드로 제공한다.
- DAG를 계속해서 만드는 것과 한 DAG 내에서 태스크를 늘리는 것 사이의 밸런스가 필요하다. (개인적인 경험은 비슷해도 오너가 다른 경우 Dag level에서 분리하는 것이 좋다. 태스크 수가 너무 많아지는 경우도 Dag를 분리하는 게 좋다.)
- Jinja 템플릿을 하나 만들어 두고, generator.py라는 변환기를 만들어 준다. config.yml 파일을 통해 세팅을 해 주고 generator.py를 돌려 주면 DAG가 생성되게 된다.
- generator.py를 자동화해 줄지 매번 사람이 돌려 줄지에 대해서는 고민해 봐야 할 문제이다.
🔎 어려웠던 내용 & 새로 알게 된 내용
📌 과제
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
의 경우 Access가 거부 당하게 되는데 이를 컨트롤 해 주는 KEY가 무엇인지와 그 키를 docker-compose.yaml에 어떻게 적용해야 하는지
- docker-compose.yaml 파일에서는 airflow-common 밑에 있는 environments 밑의
AIRFLOW__WEBSERVER__EXPOSE_CONFIG
를 'true'로 설정해 주어야 한다.
- docker compose 환경을 down 한 후 다시 up을 통해 실행해 준다.
- 그 후 config API를 호출해서 결과가 나오는지 확인해 본다.
- 이번에는 access denied가 뜨지 않고 config 환경이 잘 나오는 것을 볼 수 있다.
- connections API와 variables API는 환경 변수도 리턴하는지 아닌지
- Web UI를 통해서 세팅한 variables와 connections만 출력된다.
1. task 관련 명령어
airflow tasks list DAG명
: DAG의 task list를 가지고 온다.
airflow tasks test DAG명 task명 execution_date
: 해당 DAG의 task를 실행한다.
2. docker container 내부에서 postgres 접근
psql -h postgres
: 입력 후 로그인
\dt
: 모든 테이블이 조회된다
control + L
: clear와 동일한 기능
dag_run
: 해당 테이블에 실행 정보가 저장됨
✍ 회고
- OS 환경을 맥으로 바꿨다. 맥 환경으로 세팅을 다 다시 해 줬는데 그게 꽤 걸렸다.
- Operator들이 많다는 것은 Airflow 책 스터디를 하면서도 느꼈는데 생각보다 더 많은 Operator가 존재하고 이걸 각각 어떻게 활용해야 하는지를 프로젝트를 하면서 느끼게 될 것 같다. 개인적으로 트리거는 특히나 사용할 일이 많을 것 같은데 Explicit Trigger를 쓰는 것이 더 안정적인 프로그램이 될 거라고 생각했다. B가 A의 끝남을 받는 것보다 A가 확실하게 끝난 후에 B를 진행하는 게 프로그램에서 꼬이는 부분이 안 생기지 않을까라는 생각이 들었는데 어떤 경우 Reactive Trigger를 사용했을 때 더 효율적인지 케이스가 궁금해졌다.