이번에는 에어플로우에서 가장 흔하게 사용되는 PythonOperator를 사용해볼 겁니다.
Python으로 forex rates 파일을 다운로드 해보겠습니다.
def download_rates():
BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/"
ENDPOINTS = {
'USD': 'api_forex_exchange_usd.json',
'EUR': 'api_forex_exchange_eur.json'
}
with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies:
reader = csv.DictReader(forex_currencies, delimiter=';')
for idx, row in enumerate(reader):
base = row['base']
with_pairs = row['with_pairs'].split(' ')
indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
for pair in with_pairs:
outdata['rates'][pair] = indata['rates'][pair]
with open('/opt/airflow/dags/files/forex_rates.json', 'a') as outfile:
json.dump(outdata, outfile)
outfile.write('\n')
함수를 정의하고, 파이썬 함수를 PythonOperator에서 python_callable
에 작성합니다.
downloading_rates = PythonOperator(
task_id="downloading_rates", python_callable=download_rates
)
다시 docker 컨테이너로 접속해서, 테스트합니다.
airflow tasks test forex_data_pipeline downloading_rates 2022-01-01