ex)
- 1개의 operator에 2개 이상의 tasks를 담으면 안된다.(1 operator - 1 tasks)
- 문제가 생기는 경우 이전 상태를 다시 수행하기 위해서 원자성을 띠도록 1-1구조.
다중 분리, 연결 요소를 구성하는 것이 중요!
creating_table
)is_api_available
)extracting_user
)processing_user
)storing_user
)creating_table
)Admin > Connections
# Import DAG object - 데이터 파이프라인 생성 시 사용
from airflow.models import DAG
# SQlite operator 사용 가능, sqlite DB와 interact
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('user_processing', schedule_interval = '@daily',
default_args = default_args,
catchup = False) as dag:
creating_table = SqliteOperator(
# unique ID
task_id = 'creating_table',
sqlite_conn_id = 'db_sqlite',
sql = '''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
$ airflow tasks test user_processing creating_table 2020-01-01
Task 성공 여부를 보여준다.
Table이 sqlite3에 있는지 검사
$ sqlite airflow.db
sqlite> .table
sqlite> SELECT * FROM users
테이블이 생성된 것을 확인할 수 있다.
is_api_available
)Admin > Connections
# HTTP Sensor 사용해서 API가 동작 중인지 여부를 파악
from airflow.providers.http.sensors.http import HttpSensor
# HTTP에 의해 URL API가 Checked
# 앞에 user_api conn Id의 Host가 "https://randomuser.me/" 이므로 endpoint 'api/를 받으면
# "http://randomuser.me/api/" 가 된다.
is_api_available = HttpSensor(
task_id = 'is_api_available',
http_conn_id = 'user_api',
endpoint = 'api/'
)
$ airflow tasks test user_processing is_api_available 2020-01-01
is_available
이 문제 없으며 이후에 workflow에서도 동작할 것임을 알 수 있다.
extracting_user
)'INFO - {"results":[{"gender":"male","name":{"title":"Mr","first":"Antoni","last":"Fure"},
"location":{"street":{"number":5350,"name":"Bjørndalsjordet"},
"city":"Eide","state":"Oppland","country":"Norway","postcode":"3714",
"coordinates":{"latitude":"52.3386","longitude":"-165.5425"},
"timezone":{"offset":"+5:00","description":"Ekaterinburg, Islamabad, Karachi, Tashkent"}},
"email":"antoni.fure@example.com",
"login":{"uuid":"13a81286-cbb5-4465-8eb9-a9ea2abe8d5b","username":"sadlion681","password":"toejam","salt":"UUTttzws","md5":"338976bcf8af9c57916fd12c1240fd4b","sha1":"8440422e3082d74fc60d9414beda93999a7b6f8a","sha256":"d3d0100c04d8df09eeb054145c3ff331fe8dc0a7e333ce86243a3791d0088103"},
"dob":{"date":"1975-08-28T19:10:39.412Z","age":46},
"registered":{"date":"2018-08-03T18:33:07.186Z","age":3},
"phone":"26795836",
"cell":"91278236",
"id":{"name":"FN","value":"28087545913"},
"picture":{"large":"https://randomuser.me/api/portraits/men/63.jpg","medium":"https://randomuser.me/api/portraits/med/men/63.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/men/63.jpg"},"nat":"NO"}],
"info":{"seed":"43fc9cf21da23c85","results":1,"page":1,"version":"1.3"}}
# fetch the result of a given page of a given url
from airflow.providers.http.operators.http import SimpleHttpOperator
import json
extracting_user = SimpleHttpOperator(
task_id = 'extracting_user',
http_conn_id = 'user_api',
endpoint = 'api/',
# 데이터의 변경을 하는 것이 아니라 가져오기만 하기 때문에
method = 'GET',
# response를 처리할 수 있도록 허락
response_filter = lambda response : json.loads(response.text),
# URL의 response를 확인할 수 있다.
log_response = True
)
$ airflow tasks test user_processing extracting_user 2020-01-01
processing_user
)# table에 저장하기를 원하는 user정보를 extract하기 위해서 사용
from airflow.operators.python import PythonOperator
from pandas import json_normalize
def _processing_user(ti):
users = ti.xcom_pull(task_ids = ['extracting_user'])
# output empty / output이 예상한 것이 아닌 경우(results가 안들어있는 경우)
if not len(users) or 'results' not in users[0]:
raise ValueError('User is empty')
user = users[0]['results'][0] # json 읽어오기
processed_user = json_normalize({
'firstname': user['name']['first'],
'lastname': user['name']['last'],
'country': user['location']['country'],
'username': user['login']['username'],
'password': user['login']['password'],
'email': user['email']
})
processed_user.to_csv('/tmp/processed_user.csv', index = None, header = False)
processing_user = PythonOperator(
task_id = 'processing_user',
# python operator로부터 호출하고 싶은 operator
python_callable = _processing_user
)
$ airflow tasks test user_processing processing_user 2020-01-01
/tmp/processed_user.csv 파일 생성
설정한firstname, lastname, country, username, password, email
이 표시된다.
Joseph,Jones,Australia,brownduck393,1221,joseph.jones@example.com
storing_user
)BashOperator
사용)# 앞서 extract한 정보를 sqlite에 저장하기 위해서 사용
from airflow.operators.bash import BashOperator
storing_user = BashOperator(
task_id = 'storing_user',
# ','로 분리, 해당 파일의 users를 import해서 airflow의 sqlite에 저장.
bash_command = 'echo -e ".separator ","\n.import /tmp/processed_user.csv users" | sqlite3 /home/airflow/airflow/airflow.db'
)
$ airflow tasks test user_processing storing_user 2020-01-01
sqlite> SELECT * FROM users;
/tmp/processed_user.csv
가 /home/airflow/airflow/airflow.db
에 저장된 것을 확인할 수 있다.하지만 어떤 의존성(Dependency)이 존재하지는 않는 것을 확인할 수 있다.
>>
command를 사용한다.
creating_table >> is_api_available >> extracting_user >> processing_user >> storing_user
저장하고 Airflow Web UI를 새로고침하면 의존성이 갱신된 것을 확인할 수 있다.
CREATE TABLE if not exists users
로 수정하여 Table이 있는 경우에는 Table을 생성하지 않도록 한 후,
Workflow Trigger
시 동작하는 것을 확인할 수 있다.