: pythonOperator를 활용하여 쉽게 DAG를 작성 가능하다. 두가지 방법을 소개한다.
from airflow.operators.python import PythonOperator
t1 = PythonOperator(
dag = dag,
task_id='task_id',
python_callable = python_func,
params={
'table':'test_table',
'schema':'raw_data'
},
)
# python function
def python_func(**cxt):
table = cxt["params"]["table"]
schema = cxt["params"]["schema"]
ex_datge = cxt["execution_date"]
from airflow.decorators import task
dag = DAG(
dag_id = 'HelloWorld',
...
)
# python functions
@task
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = "HelloWorld",
...
)as dag:
# taskID == function_name
print_hello() >> print_goodbye()
참고) 중요한 DAG 파라미터들
max_active_runs
: 동시에 수행 가능한 DAG의 수max_active_tasks
: 동시에 수행되는 task들의 수catchup
: DAG의 start_date 이전에 밀린 작업들의 수행 여부 (default = True): 두 기능 모두 Airflow의 webUI를 통해 쉽게 설정 가능하다.
from airflow.providers.postgres.hooks.postgres import PostgresHook
def get_Redshift_connection(autocommit=True):
# Connection에서 수행한 연결의 id를 적어준다.
hook = PostgresHook(postgres_conn_id='<Connection id>')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
from airflow.models import Variable
코드로 사용가능하다.태스크들간에 데이터를 주고 받기 위한 방식
Operator의 리턴값을 Operator에서 읽는 형태
메타데이터 DB에 저장되기 때문에 큰 데이터는 불가능하다.
예제 코드 (Extract->Transform->Load 순으로 진행된다.)
def extract(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
f = requests.get(link)
return (f.text)
# extract의 리턴값이 transform의 입력으로 받아진다.
def transform(**context):
logging.info("Transform started")
# xcom은 params가 아니다. task_instance를 통해서 가져옴
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
...
return records
# Operator 정의
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url")
},
dag = dag)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
...
dag = dag)
load = PythonOperator(
task_id = 'load',
python_callable = load,
...
dag = dag)
extract >> transform >> load
: 작업을 하다보면 CLI환경에서 DAG들을 테스트하고, 작업을 수행해야하는 경우가 발생한다.
docker exec -it <container_id> sh
airflow dags list
airflow tasks list <DAG_id>
airflow variables list
airflow variables get <variable>
airflow tasks list UpdateSymbol (dag-id)
airflow dags test UpdateSymbol 2023-05-30