Apache Airflow에서 제공하는 연산자(Operator) 중 하나로,
Python 함수를 실행하는 데 사용
| 매개변수 | 설명 |
|---|---|
| task_id | 태스크의 고유 ID |
| python_callable | 실행할 Python 함수 |
| op_args | 함수에 위치 기반 인자 전달 (list) |
| op_kwargs | 함수에 키워드 인자 전달 (dict) |
| provide_context | 함수에 실행 컨텍스트 제공 (kwargs로 DAG 관련 정보를 전달) |
https://github.com/learndataeng/learn-airflow/blob/main/dags/HelloWorld.py
print_hello : PythonOperator로 구성되어 있으며 먼저 실행print_goodbye : PythonOperator로 구성되어 있으며 두번째로 실행
태스크를 정의하는 새로운 방법으로, 간단한 함수나 클래스에 데코레이터를 추가하여 Airflow 태스크로 변환할 수 있도록 함
| 파라미터 | 설명 |
|---|---|
| max_active_runs | 동시에 실행 가능한 인스턴스 수 |
| max_active_tasks | 동시에 실행 가능한 태스크 수 |
| catchup | DAG를 활성화 시켰을 때 그동안의 밀린 날짜를 catchup 할것인지 (default=True) |
https://github.com/learndataeng/learn-airflow/blob/main/dags/HelloWorld_v2.py

git clone https://github.com/learndataeng/learn-airflow
cp -r learn-airflow/dags/* dags


Airflow -> Admin -> Variable -> Add Variable 추가하고 저장하기

Variable이 추가되었고 다시 DAGs 탭으로 가보면 상단에 떠있던 에러들이 최대 5분 안에 해결되는 것을 확인할 수 있음 (5분에 한번씩 스캔)

Airflow -> Admin -> Connections -> Add a new record

connection 추가 완료

name_gender_v2
etl 태스크가 한 개이기 때문에 Graph 탭을 확인하면 태스크가 한 개 인 것을 알 수 있고

실패가 뜬 이유는 Code를 확인해보면 비밀번호가 제대로 입력되어있지 않기 때문

name_gender_v4
Graph 탭에서 그 내용을 확인할 수 있음 (PythonOperator)
namegender_v5
Decorators(@)를 활용한 것을 알 수 있음
docker ps

docker exec -it <스케쥴러 컨테이너 ID> sh

현재 디렉토리의 파일을 수정된 시간 순서대로, 자세히 나열 (ls -tl) 하여 dags 폴더가 있는 것을 확인하고 dags 폴더 내의 파일들을 확인 (ls -tl dags)

airflow dags list dag 리스트를 확인해보면 웹 UI에서 봤던 것과 같은 리스트를 확인할 수 있다 (에러가 있는 5개 dag는 제외되어있음)


airflow tasks list namegender_v5

airflow variables list -> key 값 확인해서 get 매서드로 그 정보를 읽어올 수 있음
지금까지 해본 것
- csv파일을 redshift 테이블로 적재해주는 데이터 파이프라인을 airflow로 고도화
- 어떻게 Docker에 로딩할 수 있는지?
- 웹 UI, 커맨드라인을 통해 실행하는 것
- variables, connections 만드는 것