The Forex Data Pipeline(Python Operator)

우상욱·2024년 3월 10일
0

Airflow

목록 보기
9/22

이번에는 에어플로우에서 가장 흔하게 사용되는 PythonOperator를 사용해볼 겁니다.
Python으로 forex rates 파일을 다운로드 해보겠습니다.

Python Operator

def download_rates():
    BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/"
    ENDPOINTS = {
        'USD': 'api_forex_exchange_usd.json',
        'EUR': 'api_forex_exchange_eur.json'
    }
    with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies:
        reader = csv.DictReader(forex_currencies, delimiter=';')
        for idx, row in enumerate(reader):
            base = row['base']
            with_pairs = row['with_pairs'].split(' ')
            indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
            outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
            for pair in with_pairs:
                outdata['rates'][pair] = indata['rates'][pair]
            with open('/opt/airflow/dags/files/forex_rates.json', 'a') as outfile:
                json.dump(outdata, outfile)
                outfile.write('\n')

함수를 정의하고, 파이썬 함수를 PythonOperator에서 python_callable에 작성합니다.

downloading_rates = PythonOperator(
        task_id="downloading_rates", python_callable=download_rates
    )

다시 docker 컨테이너로 접속해서, 테스트합니다.

airflow tasks test forex_data_pipeline downloading_rates 2022-01-01

profile
데이터엔지니어

0개의 댓글