airflow 오퍼레이터

yoon__0_0·2024년 6월 17일
0

이어드림 수업

목록 보기
63/103

Email operators

email operators 공식문서

1) dag 만들기

from airflow import DAG
import pendulum
from airflow.operators.email import EmailOperator

with DAG(
    dag_id="dags_email_operator",
    schedule="0 8 1 * *",
    start_date=pendulum.datetime(2024, 6, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    send_email_task = EmailOperator(
        task_id='send_email_task', # task id
        to='email 주소 ', # 주소 
        subject='Airflow 성공메일', # 제목
        html_content='Airflow 작업이 완료되었습니다' # 내용
    )

2) email 서버 구축하기

  • G-mail → 설정 → 모든 설정보기 → 전달및POP/IMAP →IMAP사용
  • 구글 계정관리 -> 앱비밀번호 검색 -> 앱 이름 설정 후 토큰값 복사해놓기

  • docker-compose.yaml 파일 수정(추가해주기)
    - 여기에 비빌번호를 노출시키면 git에 올라가니까 비밀번호는 빼고 git에 올리기

AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com' AIRFLOW__SMTP__SMTP_USER: '{gmail 계정}'
AIRFLOW__SMTP__SMTP_PASSWORD: '##SMTP_PASSWORD##' AIRFLOW__SMTP__SMTP_PORT: 587 AIRFLOW__SMTP__SMTP_MAIL_FROM: '{gmail 계정}'
  • 서버에서 비밀번호를 넣을 수 있게 depoly.sh 파일 만들기 : 앞으로는 서버에서 git pull 대신 deploy.sh 실행
AIRFLOW_HOME=/home/ubuntu/{git 파일이름} 
cd $AIRFLOW_HOME 
git pull
sed -i 's/##SMTP_PASSWORD##/MY_PASSWORD/g' $AIRFLOW_HOME/docker-compose.yaml
  • sh 실행
chmod +x deploy.sh
./deploy.sh
  • docker compose 재실행

  • dag 실행시 원하는 메일로 이메일이 가게됨.

python operater

python operater 공식문서

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
import random

with DAG(
    dag_id="dags_python_operator",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
    catchup=False
) as dag:
    def select_fruit():
        fruit = ['APPLE','BANANA','ORANGE','AVOCADO']
        rand_int = random.randint(0,3)
        print(fruit[rand_int])

    py_t1 = PythonOperator(
        task_id='py_t1',
        python_callable=select_fruit
    )

    py_t1 # 관계 지정을 안하고 싶으면 안적어도 됨. 

파라미터 전달해보기

args, kwargs에 대하여..

  • common/common_func.py
def regist2(name, sex, *args, **kwargs):
    print(f'이름: {name}')
    print(f'성별: {sex}')
    print(f'기타옵션들: {args}')
    email = kwargs['email'] or 'empty'
    phone = kwargs['phone'] or 'empty'
    if email:
        print(email)
    if phone:
        print(phone)
  • dags/ dags_python_with_op_kwargs.py
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from common.common_func import regist2

with DAG(
    dag_id="dags_python_with_op_kwargs",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    regist2_t1 = PythonOperator(
        task_id='regist2_t1',
        python_callable=regist2,
        op_args=['hjkim','man','kr','seoul'],
        op_kwargs={'email':'hjkim_sun@naver.com','phone':'010'}
    )

    regist2_t1
  • dags/dags_python_decorator_with_param.py
from airflow import DAG
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_decorator_with_param",
    schedule="0 2 * * 1",
    start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
    catchup=False,
) as dag:

    @task(task_id="python_task_1")
    def regist3(name, sex, *args, **kwargs):
        print(f'이름: {name}')
        print(f'성별: {sex}')
        print(f'기타옵션들: {args}')
        email = kwargs['email'] or 'empty'
        phone = kwargs['phone'] or 'empty'
        print(f'email: {email}')
        print(f'phone: {phone}')
        from pprint import pprint
        pprint(kwargs)

    python_task_1 = regist3('hjkim', 'man', 'seoul',
                            email='hjkim_sun@naver.com', phone='010')

외부의 파이썬 함수 실행시키기

  • dag 외부에 작성해 놓은 파이썬 모듈을 인식하기 위해서는 해당 경로가 sys.path에 존재해야함.
  • airflow는 기본적으로 3가지 경로를 추가해주고 있음(docker 에 들어가서 airflow info 쳐보기)
    • /opt/airflow/dags
    • /opt/airflow/config
    • /opt/airflow/plugins
  • 따라서 3개중에 하나에 외부 파이썬 파일을 넣어주면 자동적으로 인식함.
  • python 함수 정의
    • plugins/common/common_func.py 만들기
def get_sftp():
    print('sftp 작업을 시작합니다')


def regist(name, sex, *args):
    print(f'이름: {name}')
    print(f'성별: {sex}')
    print(f'기타옵션들: {args}')


def regist2(name, sex, *args, **kwargs):
    print(f'이름: {name}')
    print(f'성별: {sex}')
    print(f'기타옵션들: {args}')
    email = kwargs['email'] or 'empty'
    phone = kwargs['phone'] or 'empty'
    if email:
        print(email)
    if phone:
        print(phone)
  • dags 파일 생성
    - dags/dags_python_import_func.py
    • airflow 서버에서는 기본적으로 plugins를 python path로 잡기 때문에 실행가능 할 것.
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from common.common_func import get_sftp

with DAG(
    dag_id="dags_python_import_func",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2024, 6, 17, tz="Asia/Seoul"),
    catchup=False
) as dag:

    task_get_sftp = PythonOperator(
        task_id='task_get_sftp',
        python_callable=get_sftp
    )

task decoretor

  • python operator 의 대체
  • 실행하고자 하는 python 함수 정의 후 @task 데코레이터만 붙여주면 됨 -> python operator 객체가 됨
from airflow import DAG
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_task_decorator",
    schedule="0 2 * * 1",
    start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Suoul"),
    catchup=False
) as dag:

    @task(task_id="python_task_1")
    def print_context(some_input):
        print(some_input)

    python_task_1 = print_context('task_decorator 실행')
profile
신윤재입니다

0개의 댓글