The Forex Data Pipeline(Http Sensor, Task test)

우상욱·2024년 3월 3일
0

Airflow

목록 보기
7/22

Forex 데이터 : 외환 거래 데이터

Flow chart of the Forex Data Pipeline


  1. Check availability of forex rates
  2. Check availability of the file having currencies to watch
  3. Download forex rates with Python
  4. Save the forex rates in the HDFS
  5. Create a Hive table to store forex rates from the HDFS
  6. Process forex rates with Spark
  7. Send an Email Notification
  8. Send a Slack Notification

  • HDFS는 분산 파일 시스템으로 원하는만큼 데이터를 저장할 수 있습니다.
  • Namenode와 Datanode가 있습니다.
  • 그리고 HDFS에 있는 데이터를 SPARK로 데이터를 프로세싱합니다.
  • Hive는 Hdfs에 저장한 파일과 상호작용하게 해줍니다. SQL 구문을 사용합ㅎ니다.
  • HDFS에 있는 데이터를 쿼리하기 위해서, SQL을 사용하고, 상응하는 하이브 테이블을 만듭니다.
  • 그리고 Postgres 데이터베이스가 있습니다.
  • Adminer는 Postgres와 쉽게 상호 작용하는 도구입니다.
  • Hue는 Hive와 Hdfs에 있는 데이터를 멋진 대시보드로 확인할 수 있는 도구입니다.
  • 마지막으로는 Airflow를 확인할 수 있습니다.

What is DAG?


  • DAG = Data Pipeline
  • Directed Acyclic Graph
  • DAG는 순환하지 않고, 일방향입니다. 순환한다면 DAG가 아닙니다.
  • 순환한다면 데이터파이프라인의 종료는 없습니다.
  • 각 노드는 TASK로 생각하면 되고, edge는 의존성입니다.

Set Dag


from airflow import DAG

from datetime import datetime, timedelta


default_args = {
    "owner": "airflow",
    "email_on_failure": False, # 실패 시 이메일
    "email_on_retry": False, # retry 시 이메일
    "email": "admin@localhost.com",  # 이메일 주소 설정
    "retries": 1, # retry는 한번만
    "retry_delay": timedelta(minutes=5), # 작업을 retry 하기 전에 5분을 대기
}

# 파라미터로 DAG ID, 데이터 파이프라인 내의 고유 식별자
# 모든 DAG ID는 고유해야합니다.
# 두번째 파라미터는 시작 날짜입니다. 2021년 1월 1일에 첫 일정이 잡힌다는 점입니다.
# schedule_interval은 DAG가 매일 어떤 간격을 가지고 실행될지를 말합니다.
# @daily는 하루 간격으로 실행합니다.
# 기본 인수를 정합니다.
# catchup=False면 시작 날짜 사이와 에러난 날짜에서 트리거하지 않은 DAG를 실행 하지않습니다.
with DAG("forex_data_pipeline", start_date=datetime(2021, 1, 1),
         schedule_interval="@daily", default_args=default_args, catchup=False) as dag: 

What is an Operator?


  • Opertator는 Task입니다.
  • 연산자를 로직을 캡슐화하는 개체라고 생가하면 됩니다. 즉 실행하고자 하는 작업을 말합니다.
  • 각각의 작업은 Operator가 있습니다.
  • 두 번째로 기억해야할 것은, 실행하고 싶은 작업 당 거기에 맞는 오퍼레이터가 있다는 점입니다.
  • airflow에는 엄청나게 많은 Operator가 있습니다.

3 types of Operator

  • Action Operator
    • Python Operator
    • Bash Operator
  • Transfer Operator
    데이터를 원천에서 목적지로 전송할 수 있게 해주는 Operator
    • MySQL Operator
    • Postgres Operator
  • Sensor Operator
    일이 생길 때까지 기다렸다가, 다음으로 넘어가는
    • 파일이 파일 시스템 내 특정 위치에 도달하길 기다린다면
    • File Sensor를 이용합니다.
    • 기본값으로 파일 센서가 기대하는 파일이 매 60초마다 있는지 확인합니다.

Https Sensor


Http Sensor는 Http에 접근할 때, 해당 url에 접근이 가능한지를 센서를 통해 계속해서 기다릴 수 있습니다.

from airflow.providers.http.sensors.http import HttpSensor
    is_forex_rates_available = HttpSensor(
        task_id="is_forex_rates_available",
        http_conn_id="forex_api",
        endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b",
        response_check=lambda response: "rates" in response.text,
        # 조건이 참인지 거짓인지, 5초마다 확인하게 합니다.
        poke_interval=5,
        # 20초가 지나면 타임아웃을 받고, 작업은 실패가 됩니다.
        # 만약 이걸 입력하지 않으면 기본으로 7일 간 자동으로 센서가 동작합니다.
        timeout=20,
    )
    

Add connection


  • forex_api 추가
  • 테스트를 위해, airflow 컨테이너 접속
docker exec -it 6dc32bb08a02 /bin/bash

항상 모든 task는 테스트를 진행합니다.
이후 전체 dag가 잘 작동하는지 마지막에 테스트합니다.

  • test
airflow tasks {dag_id} {task_id} {execution_date}
airflow tasks test forex_data_pipeline is_forex_rates_available 2022-01-01

이렇게 하면 다른 작업의 종속성 없이 해당 task를 테스트할 수 있습니다.

profile
데이터엔지니어

0개의 댓글