slot
runing slots + queued slots 은 Slots 값을 넘지 못함
비교 | Poke | Reschedule |
---|---|---|
원리 | DAG이 수행되는 내내 Running Slot을 차지함. 다만 Slot 안에서 Sleep, active를 반복함 | 센서가 동작하는 시기에만 Slot을 차지함. 그 외는 Slot을 차지하지 않음 |
Wait에서 상태 | running | up_for_reschedule |
유리한 적용 시점 | 짧은 센싱 간격 (interval, 초단위) | 긴 센싱 간격, 주로 분단위 Reschedule 될때 스케줄러의 부하 발생 |
pool
모든 Task는 특정 Pool에서 수행되며 Pool은 Slot을 가지고 있음
기본적으로 task 1개당 slot 1개 점유하며 pool을 지정하지 않으면 default_pool 에서 실행됨.
from airflow.sensors.bash import BashSensor
from airflow.operators.bash import BashOperator
from airflow import DAG
import pendulum
with DAG(
dag_id='dags_bash_sensor',
start_date=pendulum.datetime(2024,6,16, tz='Asia/Seoul'),
schedule='0 6 * * *',
catchup=False
) as dag:
sensor_task_by_poke = BashSensor(
task_id='sensor_task_by_poke',
env={'FILE':'/opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/bikeList.csv'},
bash_command=f'''echo $FILE &&
if [ -f $FILE ]; then
exit 0
else
exit 1
fi''',
poke_interval=30, #30초
timeout=60*2, #2분
mode='poke',
soft_fail=False
)
sensor_task_by_reschedule = BashSensor(
task_id='sensor_task_by_reschedule',
env={'FILE':'/opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/bikeList.csv'},
bash_command=f'''echo $FILE &&
if [ -f $FILE ]; then
exit 0
else
exit 1
fi''',
poke_interval=60*3, # 3분
timeout=60*9, #9분
mode='reschedule',
soft_fail=True
)
bash_task = BashOperator(
task_id='bash_task',
env={'FILE': '/opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/bikeList.csv'},
bash_command='echo "건수: `cat $FILE | wc -l`"',
)
[sensor_task_by_poke,sensor_task_by_reschedule] >> bash_task
서로 인터벌이 달라서 하나는 성공하고 뒤에 성공하는 시간이 차이가 날 것.
파일 없을때 | 파일 있을때 |
---|---|
![]() | ![]() |
조회 : 내 인증키 넣어서 조회
http://openapi.seoul.go.kr:8088/(인증키)/json/tbCycleRentUseDayInfo/1/5/{날짜}
데이터 확인 : 오늘 날짜는 안나옴.
어제날짜 | 오늘날짜 |
---|---|
![]() | ![]() |
from airflow import DAG
from airflow.sensors.python import PythonSensor
import pendulum
from airflow.hooks.base import BaseHook
with DAG(
dag_id='dags_python_sensor',
start_date=pendulum.datetime(2024,6,16, tz='Asia/Seoul'),
schedule='10 1 * * *',
catchup=False
) as dag:
def check_api_update(http_conn_id, endpoint, check_date, **kwargs):
import requests
import json
# dict를 get으로 가지고 오면 값이 없으면 None으로 뜸
connection = BaseHook.get_connection(http_conn_id)
url = f'http://{connection.host}:{connection.port}/{endpoint}/1/5/{check_date}'
print(f'url: {url}')
response = requests.get(url)
contents = json.loads(response.text)
print(f'response: {contents}')
code = contents.get('CODE')
# 에러코드 INFO-200: 해당되는 데이터가 없습니다.
# 미 갱신시 INFO-200으로 리턴됨
if code is not None and code == 'INFO-200':
print('상태코드: INFO-200, 데이터 미갱신')
return False
elif code is None:
keys = list(contents.keys())
rslt_code = contents.get(keys[0]).get('RESULT').get('CODE')
# 정상 조회 코드 (INFO-000)
if rslt_code == 'INFO-000':
print('상태코드: INFO-000, 데이터 갱신 확인')
return True
else:
print('상태코드 불분명')
return False
sensor_task = PythonSensor(
task_id='sensor_task',
python_callable=check_api_update,
op_kwargs={'http_conn_id':'openapi.seoul.go.kr',
'endpoint':'{{var.value.apikey_openapi_seoul_go_kr}}/json/tbCycleRentUseDayInfo',
'check_date':'{{data_interval_start.in_timezone("Asia/Seoul") | ds_nodash }}'},
poke_interval=600, #10분
mode='reschedule'
)