Airflow Task 제어

이재연·2022년 8월 8일
0

Airflow를 사용해서 Task를 수행하던 도중 문제가 생겨서 중단시켜야 하는 경우 어떻게 해야할까?

Airflow의 WebServer를 사용하면 현재 진행중인 Task를 편하게 제어 할 수 있다.
또는 CLI 명령어를 사용해서도 제어 할 수 있다.

Airflow출처 : https://airflow.apache.org/docs/apache-airflow/stable/ui.html

작업중인 Task를 중간에 끝내고 싶다면 해당 Task를 클릭하고 Mark Failed 혹은 Mark Success로 변경하면 수행중인 Task가 종료되고 지정한 상태로 변경된다.

근데 SSH를 통해 원격 서버에서 Task를 실행하는 경우 SSH 접속만 종료되고 실행중인 프로세스가 종료되지 않는 문제가 발생할 수 있다.

이때 SSHOperator를 사용하고 get_pty 파라미터를 True로 지정하면 작업중인 프로세스도 함께 종료 시킬 수 있다.

SSHOperator를 사용하기 위해 apache-airflow-providers-ssh패키지가 설치되어 있는지 확인한다.

pip list

pip list 결과

만약 없다면 https://pypi.org 에서 패키지를 검색해서 install한다.

pip install 패키지명

설치된 SSHOperator를 import하고 사용한다.

ex) SSHOperator를 사용한 DAG

from airflow import DAG
from datetime import datetime
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook

ssh_hook = SSHHook(remote_host='0.0.0.0', username='username', port=22)

with DAG(
        dag_id = 'test_sample',
        description = 'sample description',
        tags = ['test'],
        schedule_interval = '* * * * *',
        start_date = datetime(2022,2,9),
) as dag:
    t1 = SSHOperator(task_id="ssh_opeator", command="command", ssh_hook=ssh_hook, get_pty=True)

SSHHook

SSHHook은 SSH 접속을 위한 정보를 구성한다.

주요 파라미터
remote_host : 문자열 타입, 연결할 원격 서버를 지정한다.
username : 문자열 타입 (Optional)
password : 문자열 타입 (Optional)
port : 정수 타입 (Optional)

그 외 파라미터 정보

SSHOperator

SSHOperator는 원격 서버에서 수행할 작업을 구성한다.

주요 파라미터
ssh_hook : SSHHook 타입, 지정하면 ssh_conn_id 파라미터를 대체한다. (Optional)
ssh_conn_id : 문자열 타입, SSH 접속에 필요한 정보(ip, username, password 등) (Optional)
remote_host : 문자열 타입, 원격 서버 정보 ssh_conn_id 혹은 ssh_hook에 정의되어 있다면 무시된다. (Optional)
command : 문자열 타입, 원격 서버에서 실행할 명령 (Optional)
get_pty : bool 타입, 의사 터미널 사용 여부 기본값은 False

그 외 파라미터 정보

0개의 댓글