Sensor

yoon__0_0·2024년 6월 18일
0

이어드림 수업

목록 보기
73/103

senser란?

  • 일종의 특화된 오퍼레이터
  • 특정 조건이 만족되기를 기다리고 만족되면 True를 반환하는 Task
  • 모든 센서는 BaseSensorOperator 를 상속하여 구현되며 이는 BaseOperator를 상속함
  • 상속시에는 init() 함수와 poke(context) 함수 재정의 필요
  • 센싱하는 로직은 poke 함수에 정의
mode 파라미터

slot
runing slots + queued slots 은 Slots 값을 넘지 못함

  • poke mode
  • reschedule 모드 : sleep 시 slot를 차지 하지 않음. 주기에 의해 체크하는 동안만 running slot을 차지 함.
비교PokeReschedule
원리DAG이 수행되는 내내 Running Slot을 차지함. 다만 Slot 안에서 Sleep, active를 반복함센서가 동작하는 시기에만 Slot을 차지함. 그 외는 Slot을 차지하지 않음
Wait에서 상태runningup_for_reschedule
유리한 적용 시점짧은 센싱 간격 (interval, 초단위)긴 센싱 간격, 주로 분단위 Reschedule 될때 스케줄러의 부하 발생

pool
모든 Task는 특정 Pool에서 수행되며 Pool은 Slot을 가지고 있음
기본적으로 task 1개당 slot 1개 점유하며 pool을 지정하지 않으면 default_pool 에서 실행됨.

BashSensor

  • bash 명령을 사용한 센서.
  • bash_command 값이 0을 리턴하면 만족을 의미
  • exit 0 : 정상 (bash 에서)
  • 비정상일때는 poke_interval 기다렸다가 다시 실행하게 됨.
  • timeout : 정해진 시간만큼 센싱하고 (반복하고) 넘어가면 그냥 끝내라
  • mode = "poke" => running slot 차지
  • mode = "reschedule" => running slot 차지 안함.
  • bash_task 는 앞 두개의 센서가 둘다 정상(파일이 있으면 )일때만 실행
from airflow.sensors.bash import BashSensor
from airflow.operators.bash import BashOperator
from airflow import DAG
import pendulum

with DAG(
    dag_id='dags_bash_sensor',
    start_date=pendulum.datetime(2024,6,16, tz='Asia/Seoul'),
    schedule='0 6 * * *',
    catchup=False
) as dag:

    sensor_task_by_poke = BashSensor(
        task_id='sensor_task_by_poke',
        env={'FILE':'/opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/bikeList.csv'},
        bash_command=f'''echo $FILE && 
                        if [ -f $FILE ]; then 
                              exit 0
                        else 
                              exit 1
                        fi''',
        poke_interval=30,      #30초
        timeout=60*2,          #2분
        mode='poke',
        soft_fail=False
    )

    sensor_task_by_reschedule = BashSensor(
        task_id='sensor_task_by_reschedule',
        env={'FILE':'/opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/bikeList.csv'},
        bash_command=f'''echo $FILE && 
                        if [ -f $FILE ]; then 
                              exit 0
                        else 
                              exit 1
                        fi''',
        poke_interval=60*3,    # 3분
        timeout=60*9,          #9분
        mode='reschedule',
        soft_fail=True
    )

    bash_task = BashOperator(
        task_id='bash_task',
        env={'FILE': '/opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/bikeList.csv'},
        bash_command='echo "건수: `cat $FILE | wc -l`"',
    )

    [sensor_task_by_poke,sensor_task_by_reschedule] >> bash_task

서로 인터벌이 달라서 하나는 성공하고 뒤에 성공하는 시간이 차이가 날 것.

파일 없을때파일 있을때

pythonsensor

  • python 함수를 실행시켜 결과가 true이면 만족을 의미

데이터

어제날짜오늘날짜
  • 전일자 값이 언제 올라오는지 확인할 수 없음
  • 따라서 이를 센싱 하려고 함.

실행 코드 및 설명

  • dags_python_sensor.py
from airflow import DAG
from airflow.sensors.python import PythonSensor
import pendulum
from airflow.hooks.base import BaseHook

with DAG(
    dag_id='dags_python_sensor',
    start_date=pendulum.datetime(2024,6,16, tz='Asia/Seoul'),
    schedule='10 1 * * *',
    catchup=False
) as dag:
    def check_api_update(http_conn_id, endpoint, check_date, **kwargs):
        import requests
        import json
        # dict를 get으로 가지고 오면 값이 없으면 None으로 뜸 
        connection = BaseHook.get_connection(http_conn_id)
        url = f'http://{connection.host}:{connection.port}/{endpoint}/1/5/{check_date}'
        print(f'url: {url}')
        response = requests.get(url)
        contents = json.loads(response.text)
        print(f'response: {contents}')
        code = contents.get('CODE')

        # 에러코드 INFO-200: 해당되는 데이터가 없습니다.
        # 미 갱신시 INFO-200으로 리턴됨
        if code is not None and code == 'INFO-200':
            print('상태코드: INFO-200, 데이터 미갱신')
            return False
        elif code is None:
            keys = list(contents.keys())
            rslt_code = contents.get(keys[0]).get('RESULT').get('CODE')

            # 정상 조회 코드 (INFO-000)
            if rslt_code == 'INFO-000':
                print('상태코드: INFO-000, 데이터 갱신 확인')
                return True
        else:
            print('상태코드 불분명')
            return False

    sensor_task = PythonSensor(
        task_id='sensor_task',
        python_callable=check_api_update,
        op_kwargs={'http_conn_id':'openapi.seoul.go.kr',
                   'endpoint':'{{var.value.apikey_openapi_seoul_go_kr}}/json/tbCycleRentUseDayInfo',
                   'check_date':'{{data_interval_start.in_timezone("Asia/Seoul") | ds_nodash }}'},
        poke_interval=600,   #10분
        mode='reschedule'
    )
  • 제대로 조회가 되면 dag 실행이 종료됨.
profile
신윤재입니다

0개의 댓글