Apache Airflow is an open source platform to programmatically author, schedule and monitor workflowsDynamicScalableExtensibilityWeb ServerFlask와 Gunic
하나의 노드는 Web Server와, Scheduler, MetaStore, Executor로 구성됩니다.먼저 웹서버는 Meta Database에 기반해서 Metadata를 fetch해서 Dag, Task Instance, User Interface에 해당하는 정보를
나중에 자세히 기록
localhost:8080 접속해당 토글로 Dag를 일시 정지하거나 다시 켤 수 있습니다. Dag를 수동으로 작동시켜도 해당 토글을 켜야합니다.example이라는 태그가 보입니다. 태그를 통해서 Filter를 걸어서, 해당 태그의 Dag만 볼 수 있습니다.필터를 거는
어떤 문제에 대해서는, WebUI를 사용할 수 없어서, CLI로 처리해야할 때가 있습니다. 예를 들어서 Dag를 모두 backfill 하고 싶고, Backfill 과정이 중단되더라도 트리거 되지 않은 Dag를 모두 실행하고 싶다면 이런 케이스에서는 CLI를 사용해야만
How to create a real Datapiple?What is a DAG, an Operator?How to execute a Python function?데이터 파이프라인에서는 많은 파이썬 함수를 실행하게 됩니다. 그걸 위한 Operator도 별도로 존재합니다
Forex 데이터 : 외환 거래 데이터Check availability of forex ratesCheck availability of the file having currencies to watchDownload forex rates with PythonSave th
File Sensor는 60초마다 파일이 있는지 없는지 확인합니다.Operator를 작성하면, 항상 Documentation을 확인합니다.Airflow File Sensor(https://airflow.apache.org/docs/apache-airflow/s
이번에는 에어플로우에서 가장 흔하게 사용되는 PythonOperator를 사용해볼 겁니다.Python으로 forex rates 파일을 다운로드 해보겠습니다.Python Operator함수를 정의하고, 파이썬 함수를 PythonOperator에서 python_callab
만약 파일이 기가바이트, 테라바이트 단위라면, 해당 파일을 분산 파일 시스템에 저장하고 싶을 것입니다. 분명 로컬 PC로는 한계가 있습니다.코드 작성에어플로우 컨테이너 접속 후 테스트Hue는 HDFS에 있는 파일에 쿼리 요청을 할 수 있고, 또한 HDFS의 파일을
방금 전 포스팅에서 HDFS에 데이터를 저장했습니다. 이번에는 HDFS에 저장되었으니, 이 데이터를 상호작용하는 방법을 찾아보겠습니다.하이브를 사용합니다. Hive로 데이터, 파일 위에 테이블을 만들겁니다. 그렇게 하면, SQL을 통해 데이터를 쿼리할 수 있습니다.이번
데이터 파이프라인에서 이메일을 통해서, 실패 혹은 retry에 대한 이메일을 받도록 구성할 수 있습니다.구글 이메일 앱 등록해당 링크에서 앱을 생성합니다.생성된 앱 이름과 비밀번호를 활용합니다.airflow.cfg 파일을 수정합니다.yop mail을 통해서 테스트해봅니
슬랙 링크새로운 워크스페이스를 생성합니다.monitoring 채널을 생성해봅니다.슬랙 appsCreate a New app 클릭앱을 만들고, Incoming Web Hooks를 활성화합니다.활성화 한 뒤에, Add new webhook to Workspace를 클릭해서
현재 DAG에는 종속성이 없는 것을 확인할 수 있습니다. 이 DAG의 종속성을 추가하기 위해서는, 추가적인 코드 작성이 필요합니다. downstream을 정의하기 위해서 이렇게 작성합니다.하지만 더욱 간단하고, 직관적으로 작성하는 방법이 있습니다.이렇게 작성할 수 있습
에어플로우에서 Start_date와 Schedule_interval은 처음에는 이 파라미터가 혼란스러울 수 있습니다.start_date : The date from which tasks of your DAG can be scheduled and triggered이 파라
datetime.datetime() in python gives naive datetime objects by default!a datetime without a timezone is not in UTCImport airflow.timezone to create you
task 레벨에서 정의만약 이전 task가 실패한 경우, 현재 task는 실행 X스케줄이 있는 DAG RUN의 상태의 개념으로 이해해야합니다. 어떤 데이터를 처리함에 있어서, 그 전 날의 processing이 에러를 발생시켰다면, 그 다음 날의 processing을 막
Airflow dags가 위치하는 곳dags_folder라는 파라미터로 정의되었습니다이 경로는 절대경로입니다.기본값 : $AIRFLOW_HOME/dags문제는 너무 많은 DAG들이나, DAGs가 많은 외부 파일들을 이용한다는 점입니다. 이런 문제를 해결하기 위해서 DA
에어플로우의 컨테이너의 로그로 접근해서, 위로 조금 올려보면 Gunicorn server의 상태가 나옵니다. 현재는 sync된 4개의 Worker가 있습니다. Gunicorn은 다양한 워커를 갖습니다.Sync Worker각 Worker는 한번에 하나의 요청만 처리합니다
dag_run timeoutmax_active_runs만약 이 파라미터가 16이라면 백필 과정에서 하나의 DAG의 DAG RUN을 병렬로 16개를 실행합니다. backfill 과정을 빠르게 하기 위함입니다. 이 때, 각 dag_run의 실행시간을 보장하기 위해서, da
Pytest테스트를 간단한 코드 작성으로 가능하게이해하기 쉽고, 보기 좋고, 유용한 실패 정보널리 사용되는, 잘 정리된 문서사용하기 쉽고, 확장 가능한DAG Validation TestsDAG/Pipeline Definition TestsUnit TestsIntegra