airflow를 도입하게 된 이유
슬기로운 통계생활, Airflow 마스터 클래스 강의학습 내용 정리 오픈소스 워크플로우 관리도구 with PythonPython 사용워크플로우 : DAG(Directed Acyclic Graph), DAG안에는 1개 이상의 TASK가 존재 비순환, 선후행 순서가 존재C

왼쪽에 vscode/airflow 가 로컬 환경 , 오른쪽에 wsl이 서버 환경이라고 가정하고 진행 wsl 설치,도커 설치 - mac환경이라 passymal파일 다운로드 https://airflow.apache.org/docs/apache-airflow/sta

기본설치 때 만든 wsl 내 dags폴더에 로컬 환경과 연동하여 수행 .
docker compose up 할때 아래와 같은 에러 발생airflow-init-1 | airflow already exist in the db원인 : 기존에 설치 했다가 지우고? 혹은 다시 설치 할때 발생 "airflow already exist in t

워크플로우 = DAG 오퍼레이터 = 클래스 ( 설계도 )TASK = 인스턴스 ( 객체화 ) DAG 안에 여러개의 오퍼레이터가 있고 오퍼레이터를 객체화 한 task가 존재 Task 수행 주체 1\. 파싱 : Dag 파일 읽어서, 컴파일(문법체크) , task 관계 파악
WEB UI 보여지는 DAG 이름 , .py 파일명이랑 dag_id 는 일치 시키는게 좋음 UI Graph에서 보이는 이름 , 객체명과 task_id명은 동일하게 작성하는게 편리함
분 / 시 / 일 / 월 / 요일\| from 슬기로운통계생활, Airflow 마스터 클래스
t1 >> [t3, t2] >> t4 t5 >> t4 [t4, t7] >> t6 >> t8

복잡한 로직을 처리하는 경우 ex ) 1\. sftp를 통해 파일을 받은 후 DB에 insert & tar.gz로 압축 2\. 쉘 명령어 재사용 가능 ( like db insert 시 서버 정보만 변수로 받고 insert 하는 부분은 쉘로 재사용가능)issus : 1\

g-mail 내에서 설정 >> 모든 설정 보기 >> 전달 및 POPIMAPIMAP 액세스 부분 >> IMAP 사용 google 프로필 google 계정관리 >> 보안 >> Google에 로그인 하는 방법 >> 2 단계 인증 >> 앱 비밀번호(앱선택 mail, 기기 아무
정의된 파이썬 함수를 실행시키는 Operator : 가장 많이 쓰이는 OperatorPythonOperator, BranchPythonOperator 요 두개가 가장 많이 쓰임BranchPythonOperator : 선행 Python Operator를 BranchPyt

DAG 외부 함수 호출 , 파이썬 모듈 경로 이해 from airflow.operators.python import PythonOperator 파이썬은 위 경로를 어떻게 찾을까 ? : dag에서 외부 함수를 import할때 경로는 어떻게 해야하는지 이해하기 위함 s

Python은 함수 안에 함수를 선언 가능 .AIRFLOW 마스터 클래스 내 자료 활용 모든 get_data()를 함수를 outer_func로 감싸서 수정해야 함. 불편함 추가 task decorator 연결 관계 시 task1() >> task2() 같이 함수호출로

기존 호출하는 부분은 변경하지 않고, 함수 정의 부분만 변경하여 진행가능\*args : 튜플형태로 args\[] 인덱스 형태로 추출 할수 있음count = args0 if len(args) >= 1 else None 이 부분은 튜플에 범위 이상에 값을 추출할때 안전로

Jinja 템플릿이란 ? Airflow에서 Jinja 템플릿은 DAG (Directed Acyclic Graph) 정의 파일에서 동적으로 값을 생성하거나 파일 경로, SQL 쿼리 등을 템플릿화하는 데 사용되는 강력한 기능입니다. Jinja는 Python에서 널리 사용되

일배치, 매일 0시 30분에 수행한다고 했을때 수행시점 : 2024년 6월 2일 06시
Airflow에서 매크로 변수는 DAG 실행 시점에 동적으로 생성되는 값을 나타내는 Jinja 템플릿입니다. Jinja 템플릿은 Airflow에서 널리 사용되는 텍스트 템플릿 엔진이며, 매크로 변수는 이 Jinja 템플릿의 특별한 형태입니다.DAG 실행 정보: 주로 D

https://airflow.apache.org/docs/apache-airflow/stable/\_api/airflow/operators/python/index.html! 그냥 편한걸로 ...

Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 사용하고 싶은 경우주로 작은 규모의 데이터를 공유하기 위해 사용( Xcom 내용은 메타 DB의 xcom 테이블에 값이 저

Bash Operator에서는 마지막 리턴값이 template에 저장됨 아래 코드에서 마지막 'COMPLETE'가 return_value에 저장됨 taskid로 xcompull 시 return_value값 불러옴 bash_push = BashOperator( tas
Xcom : 특정 dag, 특정 schedule에 수행되는 task간에만 공유 가능 ? -> dag안에 task들끼리 공유, task수행 xcom값이 업데이트 됨으로 과거 시점에 xcom 값은 알기 어렵다 ? key 값이 동일하기 때문 Variable은 메타 DB
task1의 결과에 따라 하위 task하나만 수행해야 할때 사용 업로드중..선행 task에 return 값이 후행 task의 id 인데 string 으로 return 필요아래 2개만 유의하면 위 와 같음 . @task.branch(task_id = 'python_bra

Trigger란 ? 앞서 공부한 Branch 와 반대로, 하위 task가 동작하기 위한 룰로 이해 all_success vs none_failed : skipped여부 차이 상위 Task가 모두 수행되면 실행 (실패도 수행된 것에 포함)설정 : 1,2,3 >> 4 ,

Task가 갯수가 많을때 , Group화 해서 관리 하기 위함 . Task들의 모음 , Task Group안에 Task Group 중첩 사용가능 개인적으로 클래스 이용하는게 편함task순서와 동일하게 그룹간에도 아래와 같이 순서 지정 가능 group_1() >> gro

Task연결에 대한 설명 ( comment
DAG 의존관계 설정 방법 1) TriggerDagRun Operator > 다른 DAG을 실행시키는 오퍼레이터 2) External 센서 > 다른 DAG의 Task의 완료를 기다리는 센서 |비교|TriggerDagRun오퍼레이터|ExternalTask센서| |--
서울시 공공데이터 https://data.seoul.go.kr/ API 키 발급 후 , airflow web -> admin -> connections 등록 아무거나 공공데이터나 들어가서 , Host 및 Port 확인 ; Open API -> 샘플 url 확인