ExternalTaskSensor

yoon__0_0·2024년 6월 18일
0

이어드림 수업

목록 보기
74/103

DAG의 특정 task 상태를 Sensing 하는 방식

  • 파라미터

실습

내용

  • external_task_sensor_a
    • dags_branch_python_operator에서 task a를 sensing
    • allowd_states : skipped 상태면 센싱 된걸로 판단하겠다.
    • execution_delta : 센서의 덱 시간기준으로 언제돌았떤(과거) 스케줄을 센싱할건지?
    • dags_branch_python_operator는 1시에 돌고,external_task_sensor_a이 dag은 7시에 돌기 때문에 차이는 6시간.
  • external_task_sensor_b
    • failed_states : 언제 fail 할건지
  • external_task_sensor_c
    • allowd_states : success 상태면 성공
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
import pendulum
from datetime import timedelta
from airflow.utils.state import State 

with DAG(
    dag_id='dags_external_task_sensor',
    start_date=pendulum.datetime(2024,6,16, tz='Asia/Seoul'),
    schedule='0 7 * * *',
    catchup=False
) as dag:
    external_task_sensor_a = ExternalTaskSensor(
        task_id='external_task_sensor_a',
        external_dag_id='dags_branch_python_operator',
        external_task_id='task_a',
        allowed_states=[State.SKIPPED],
        execution_delta=timedelta(hours=6),
        poke_interval=10        #10초
    )

    external_task_sensor_b = ExternalTaskSensor(
        task_id='external_task_sensor_b',
        external_dag_id='dags_branch_python_operator',
        external_task_id='task_b',
        failed_states=[State.SKIPPED],
        execution_delta=timedelta(hours=6),
        poke_interval=10        #10초
    )

    external_task_sensor_c = ExternalTaskSensor(
        task_id='external_task_sensor_c',
        external_dag_id='dags_branch_python_operator',
        external_task_id='task_c',
        allowed_states=[State.SUCCESS],
        execution_delta=timedelta(hours=6),
        poke_interval=10        #10초
    )

결과

dags_branch_python_operatordags_external_task_sensor

custom sensor

  • operator, hook 과 마찬가지로 sensor 도 원하는 기능을 넣어 custom 하게 만들 수 있음

  • dags_python_sensor에서 구현한 공공데이터 데이터셋 갱신 여부를 센싱하는 custom 센서 만들기

  • plugins/sensors/seoul_api_date_sensor.py

from airflow.sensors.base import BaseSensorOperator
from airflow.hooks.base import BaseHook
'''
서울시 공공데이터 API 추출시 특정 날짜로 업데이트 되었는지 확인하는 센서 
{dataset}/1/5/{yyyymmdd} 형태로 조회하는 데이터셋만 적용 가능
'''


class SeoulApiDateSensor(BaseSensorOperator):
    template_fields = ('endpoint', 'check_date')

    def __init__(self, dataset_nm, check_date, **kwargs):
        '''
        dataset_nm: 서울시 공공데이터 포털에서 센싱하고자 하는 데이터셋 명
        base_dt_col: 센싱 기준 컬럼 (yyyy.mm.dd... or yyyy/mm/dd... 형태만 가능)
        day_off: 배치일 기준 생성여부를 확인하고자 하는 날짜 차이를 입력 (기본값: 0)
        '''
        super().__init__(**kwargs)
        self.http_conn_id = 'openapi.seoul.go.kr'
        self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + \
            dataset_nm + '/1/5'  # 5건만 추출
        self.check_date = check_date

    def poke(self, context):
        import requests
        import json
        connection = BaseHook.get_connection(self.http_conn_id)
        url = f'http://{connection.host}:{connection.port}/{
            self.endpoint}/{self.check_date}'
        self.log.info(f'url: {url}')
        response = requests.get(url)
        contents = json.loads(response.text)
        self.log.info(f'response: {contents}')
        code = contents.get('CODE')

        # 에러코드 INFO-200: 해당되는 데이터가 없습니다.
        # 미 갱신시 INFO-200으로 리턴됨
        if code is not None and code == 'INFO-200':
            self.log.info('상태코드: INFO-200, 데이터 미갱신')
            return False
        elif code is None:
            keys = list(contents.keys())
            try:
                rslt_code = contents.get(keys[0]).get('RESULT').get('CODE')
                # 정상 조회 코드 (INFO-000)
                if rslt_code == 'INFO-000':
                    self.log.info('상태코드: INFO-000, 데이터 갱신 확인')
                    return True
                else:
                    self.log.info('기타 상태코드')
                    return False
            except:
                # 비정상 호출일 경우 .get('CODE')에서 에러 발생
                self.log.info('기타 상태코드')
                return False
  • dags/dags_custom_sensor.py
from sensors.seoul_api_date_sensor import SeoulApiDateSensor
from airflow import DAG
import pendulum

with DAG(
    dag_id='dags_custom_sensor',
    start_date=pendulum.datetime(2024, 6, 14, tz='Asia/Seoul'),
    schedule='0 9 * * *',
    catchup=False
) as dag:
    sensor__tb_cycle_rent_use_day_info = SeoulApiDateSensor(
        task_id='sensor__tb_cycle_rent_use_day_info',
        dataset_nm='tbCycleRentUseDayInfo',
        check_date='{{data_interval_start.in_timezone("Asia/Seoul") | ds_nodash }}'
    )
profile
신윤재입니다

0개의 댓글