1) dag 만들기
from airflow import DAG
import pendulum
from airflow.operators.email import EmailOperator
with DAG(
dag_id="dags_email_operator",
schedule="0 8 1 * *",
start_date=pendulum.datetime(2024, 6, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
send_email_task = EmailOperator(
task_id='send_email_task', # task id
to='email 주소 ', # 주소
subject='Airflow 성공메일', # 제목
html_content='Airflow 작업이 완료되었습니다' # 내용
)
2) email 서버 구축하기
구글 계정관리 -> 앱비밀번호 검색 -> 앱 이름 설정 후 토큰값 복사해놓기
docker-compose.yaml 파일 수정(추가해주기)
- 여기에 비빌번호를 노출시키면 git에 올라가니까 비밀번호는 빼고 git에 올리기
AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com' AIRFLOW__SMTP__SMTP_USER: '{gmail 계정}'
AIRFLOW__SMTP__SMTP_PASSWORD: '##SMTP_PASSWORD##' AIRFLOW__SMTP__SMTP_PORT: 587 AIRFLOW__SMTP__SMTP_MAIL_FROM: '{gmail 계정}'
AIRFLOW_HOME=/home/ubuntu/{git 파일이름}
cd $AIRFLOW_HOME
git pull
sed -i 's/##SMTP_PASSWORD##/MY_PASSWORD/g' $AIRFLOW_HOME/docker-compose.yaml
chmod +x deploy.sh
./deploy.sh
docker compose 재실행
dag 실행시 원하는 메일로 이메일이 가게됨.
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
import random
with DAG(
dag_id="dags_python_operator",
schedule="30 6 * * *",
start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
catchup=False
) as dag:
def select_fruit():
fruit = ['APPLE','BANANA','ORANGE','AVOCADO']
rand_int = random.randint(0,3)
print(fruit[rand_int])
py_t1 = PythonOperator(
task_id='py_t1',
python_callable=select_fruit
)
py_t1 # 관계 지정을 안하고 싶으면 안적어도 됨.
def regist2(name, sex, *args, **kwargs):
print(f'이름: {name}')
print(f'성별: {sex}')
print(f'기타옵션들: {args}')
email = kwargs['email'] or 'empty'
phone = kwargs['phone'] or 'empty'
if email:
print(email)
if phone:
print(phone)
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from common.common_func import regist2
with DAG(
dag_id="dags_python_with_op_kwargs",
schedule="30 6 * * *",
start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
catchup=False
) as dag:
regist2_t1 = PythonOperator(
task_id='regist2_t1',
python_callable=regist2,
op_args=['hjkim','man','kr','seoul'],
op_kwargs={'email':'hjkim_sun@naver.com','phone':'010'}
)
regist2_t1
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_decorator_with_param",
schedule="0 2 * * 1",
start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
catchup=False,
) as dag:
@task(task_id="python_task_1")
def regist3(name, sex, *args, **kwargs):
print(f'이름: {name}')
print(f'성별: {sex}')
print(f'기타옵션들: {args}')
email = kwargs['email'] or 'empty'
phone = kwargs['phone'] or 'empty'
print(f'email: {email}')
print(f'phone: {phone}')
from pprint import pprint
pprint(kwargs)
python_task_1 = regist3('hjkim', 'man', 'seoul',
email='hjkim_sun@naver.com', phone='010')
def get_sftp():
print('sftp 작업을 시작합니다')
def regist(name, sex, *args):
print(f'이름: {name}')
print(f'성별: {sex}')
print(f'기타옵션들: {args}')
def regist2(name, sex, *args, **kwargs):
print(f'이름: {name}')
print(f'성별: {sex}')
print(f'기타옵션들: {args}')
email = kwargs['email'] or 'empty'
phone = kwargs['phone'] or 'empty'
if email:
print(email)
if phone:
print(phone)
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from common.common_func import get_sftp
with DAG(
dag_id="dags_python_import_func",
schedule="30 6 * * *",
start_date=pendulum.datetime(2024, 6, 17, tz="Asia/Seoul"),
catchup=False
) as dag:
task_get_sftp = PythonOperator(
task_id='task_get_sftp',
python_callable=get_sftp
)
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_task_decorator",
schedule="0 2 * * 1",
start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Suoul"),
catchup=False
) as dag:
@task(task_id="python_task_1")
def print_context(some_input):
print(some_input)
python_task_1 = print_context('task_decorator 실행')