Forex 데이터 : 외환 거래 데이터
DAG
= Data Pipeline
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"email_on_failure": False, # 실패 시 이메일
"email_on_retry": False, # retry 시 이메일
"email": "admin@localhost.com", # 이메일 주소 설정
"retries": 1, # retry는 한번만
"retry_delay": timedelta(minutes=5), # 작업을 retry 하기 전에 5분을 대기
}
# 파라미터로 DAG ID, 데이터 파이프라인 내의 고유 식별자
# 모든 DAG ID는 고유해야합니다.
# 두번째 파라미터는 시작 날짜입니다. 2021년 1월 1일에 첫 일정이 잡힌다는 점입니다.
# schedule_interval은 DAG가 매일 어떤 간격을 가지고 실행될지를 말합니다.
# @daily는 하루 간격으로 실행합니다.
# 기본 인수를 정합니다.
# catchup=False면 시작 날짜 사이와 에러난 날짜에서 트리거하지 않은 DAG를 실행 하지않습니다.
with DAG("forex_data_pipeline", start_date=datetime(2021, 1, 1),
schedule_interval="@daily", default_args=default_args, catchup=False) as dag:
Http Sensor는 Http에 접근할 때, 해당 url에 접근이 가능한지를 센서를 통해 계속해서 기다릴 수 있습니다.
from airflow.providers.http.sensors.http import HttpSensor
is_forex_rates_available = HttpSensor(
task_id="is_forex_rates_available",
http_conn_id="forex_api",
endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b",
response_check=lambda response: "rates" in response.text,
# 조건이 참인지 거짓인지, 5초마다 확인하게 합니다.
poke_interval=5,
# 20초가 지나면 타임아웃을 받고, 작업은 실패가 됩니다.
# 만약 이걸 입력하지 않으면 기본으로 7일 간 자동으로 센서가 동작합니다.
timeout=20,
)
docker exec -it 6dc32bb08a02 /bin/bash
항상 모든 task는 테스트를 진행합니다.
이후 전체 dag가 잘 작동하는지 마지막에 테스트합니다.
airflow tasks {dag_id} {task_id} {execution_date}
airflow tasks test forex_data_pipeline is_forex_rates_available 2022-01-01
이렇게 하면 다른 작업의 종속성 없이 해당 task를 테스트할 수 있습니다.