[Airflow] PythonOperator를 사용해서 Airflow DAG 작성하기

MINJI·2024년 11월 23일

⭐ PythonOperator & Aiflow Decorators(@)

1. PythonOperator

개념

Apache Airflow에서 제공하는 연산자(Operator) 중 하나로, Python 함수를 실행하는 데 사용

  • PythonOperator는 사용자가 정의한 Python 함수를 실행
  • 실행할 함수에 파라미터를 전달할 수 있음
  • Airflow의 실행 컨텍스트(예: DAG, 태스크 실행 정보 등)를 함수 내부에서 사용할 수 있음

중요 매개변수

매개변수설명
task_id태스크의 고유 ID
python_callable실행할 Python 함수
op_args함수에 위치 기반 인자 전달 (list)
op_kwargs함수에 키워드 인자 전달 (dict)
provide_context함수에 실행 컨텍스트 제공 (kwargs로 DAG 관련 정보를 전달)

소스코드

https://github.com/learndataeng/learn-airflow/blob/main/dags/HelloWorld.py

  • 2개의 태스크로 구성된 데이터 파이프라인 (DAG)
    print_hello : PythonOperator로 구성되어 있으며 먼저 실행
    print_goodbye : PythonOperator로 구성되어 있으며 두번째로 실행

2. Aiflow Decorators

개념

태스크를 정의하는 새로운 방법으로, 간단한 함수나 클래스에 데코레이터를 추가하여 Airflow 태스크로 변환할 수 있도록 함

  • 데코레이터를 사용하여 Python 함수를 쉽게 Airflow 태스크로 등록할 수 있음
  • 기존의 PythonOperator를 사용하는 방식보다 코드가 간결하고 읽기 쉬움
  • 데코레이터는 DAG 내에서 쉽게 태스크를 정의하고 연결할 수 있음

중요 DAG 파라미터

파라미터설명
max_active_runs동시에 실행 가능한 인스턴스 수
max_active_tasks동시에 실행 가능한 태스크 수
catchupDAG를 활성화 시켰을 때 그동안의 밀린 날짜를 catchup 할것인지 (default=True)

소스코드

https://github.com/learndataeng/learn-airflow/blob/main/dags/HelloWorld_v2.py

  • Task Decorator를 사용하면 훨씬 더 프로그램이 직관적


⭐ Name Gender 예제 프로그램 포팅

1. 소스코드 가져오기

  • GitHub 클론해오기
git clone https://github.com/learndataeng/learn-airflow
  • 클론해온 파일들을 원하는 폴더에 copy하기
cp -r learn-airflow/dags/* dags
  • airflow 로컬호스트 8080 확인해보기 (dags폴더에 git clone한 새로운 파일들이 추가되었음)

2. DAG 에러 해결하기

  • 새로운 파일들이 추가된 것을 확인할 수 있지만, 상단에 에러가 난 파일들도 확인할 수 있음

Airflow에 Variable 추가

  • Airflow -> Admin -> Variable -> Add Variable 추가하고 저장하기

  • Variable이 추가되었고 다시 DAGs 탭으로 가보면 상단에 떠있던 에러들이 최대 5분 안에 해결되는 것을 확인할 수 있음 (5분에 한번씩 스캔)

Airflow에 Connection 추가

  • Airflow -> Admin -> Connections -> Add a new record

    • Conn ID: redshift_dev_db
    • Conn Type: Amazon Redshift (혹은 Postgres)
    • Host: learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com
    • Database: dev
    • User: 본인의 ID 사용
    • Password:본인의 패스워드 사용
    • Port: 5439

  • connection 추가 완료

DAG 확인해보기

name_gender_v2

  • etl 태스크가 한 개이기 때문에 Graph 탭을 확인하면 태스크가 한 개 인 것을 알 수 있고

  • 실패가 뜬 이유는 Code를 확인해보면 비밀번호가 제대로 입력되어있지 않기 때문

name_gender_v4

  • etl 태스크가 3개이기 때문에 Graph 탭에서 그 내용을 확인할 수 있음 (PythonOperator)

namegender_v5

  • PythonOperator가 아닌 Decorators(@)를 활용한 것을 알 수 있음

3. 터미널 환경에서 하는 법

airflow scheduler 안으로 들어가기

  • scheduler 커맨드에 해당하는 컨테이너 ID 복사하기
docker ps

  • airflow scheduler 안으로 로그인하기
docker exec -it <스케쥴러 컨테이너 ID> sh

dag 확인하기

  • 현재 디렉토리의 파일을 수정된 시간 순서대로, 자세히 나열 (ls -tl) 하여 dags 폴더가 있는 것을 확인하고 dags 폴더 내의 파일들을 확인 (ls -tl dags)

  • airflow dags list dag 리스트를 확인해보면 웹 UI에서 봤던 것과 같은 리스트를 확인할 수 있다 (에러가 있는 5개 dag는 제외되어있음)

task 확인하기

  • namegender_v5 DAG의 태스크 확인하기 (extract, load, transform 3개인 것을 확인)
airflow tasks list namegender_v5

variable 확인하기

  • airflow variables list -> key 값 확인해서 get 매서드로 그 정보를 읽어올 수 있음


지금까지 해본 것

  • csv파일을 redshift 테이블로 적재해주는 데이터 파이프라인을 airflow로 고도화
  • 어떻게 Docker에 로딩할 수 있는지?
  • 웹 UI, 커맨드라인을 통해 실행하는 것
  • variables, connections 만드는 것

0개의 댓글