API를 활용한 Airflow DAG 작성

yjbenkang·2024년 11월 14일

Yahoo Finance API를 호출해서 애플 주식을 읽어오는 Full Refresh 기반의 DAG 작성

구현 DAG의 세부 사항 - Full Refresh로 구현

  1. Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)
  2. Redshift상의 테이블로 1에서 받은 레코드들을 적재

Extract/Transform: Yahoo Finance API 호출

  • Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱
    • 기본으로 지난 한달의 주식 가격을 리턴해줌
      import yfinance as yf
      @task
      def get_historical_prices(symbol):
      ticket = yf.Ticker(symbol)
      data = ticket.history()
      records = []
      for index, row in data.iterrows():
      date = index.strftime('%Y-%m-%d %H:%M:%S')
      records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
      return records

Load: Redshift의 테이블을 업데이트

  • Full Refresh로 구현
    • 매번 테이블을 새로 만드는 형태로 구성
  • 트랜잭션 형태로 구성 (NameGender DAG와 동일)
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')

        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records

@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
        cur.execute(f"""
CREATE TABLE {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")
        for r in records:
            sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("kyongjin1234", "stock_info", results)

실행 데모

  • 앞서 코드 실행

    • Docker로 로그인해서 yfinance 모듈 설치가 필요

    • airflow dags test UpdateSymbol 2023-05-20

  • docker contaniner에 루트 유저로 로그인하는 방법

구현 DAG의 세부 사항 - Incremental Update로 구현

  1. Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)
  2. Redshift 상의 테이블로 1에서 받은 레코드들을 적재하고 중복 제거
    a. 매일 하루치의 데이터씩 늘어남

Load :Redshift의 테이블을 업데이트

  • Incremental Update로 구현
    • 임시 테이블 생성하면서 현재 테이블의 레코드를 복사 (CREATE TEMP TABLE ... AS SELECT)
    • 임시 테이블로 Yahoo Finance API로 읽어온 레코드를 적재
    • 원본 테이블을 삭제하고 새로 생성
    • 원본 테이블에 임시 테이블의 내용을 복사 (이 때 SELECT DISTINCT *를 사용하여 중복 제거)
  • 트랜잭션 형태로 구성 (NameGender DAG와 동일)
def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")


@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
        _create_table(cur, schema, table, False)
        # 임시 테이블로 원본 테이블을 복사
        cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # 원본 테이블 생성
        _create_table(cur, schema, table, True)
        # 임시 테이블 내용을 원본 테이블로 복사
        cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;") 
        raise
    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol_v2',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("kyongjin1234", "stock_info_v2", results)
profile
keep growing

0개의 댓글