Airflow - airflow.cfg 및 API 활용한 DAG작성

yjbenkang·2024년 11월 15일

airflow.cfg

  1. DAGs 폴더는 어디에 지정되는가?
    a. 기본적으로는 Airflow가 설치된 디렉토리 밑의 dags 폴더가 되며 dags_folder 키에 저장됨
  2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나 ? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?
    a. dag_dir_list_interval (기본값은 300 = 5분)
  3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가 ?
    a. api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경(Id/password인증방식)
  4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야하는데 이 단어들은 무엇일까?
    a. password, secret, passwd, authorization, api_key, apikey, access_token
  5. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?
    a. sudo systemctl restart airflow-webserver
    b. sudo systemctl restart airflow-scheduler
    c. 만약 docker일 경우 수정후 저장 후 docker engine에서 해당 서비스를 중단 후 다시 실행
  • airflow db init을 선택하면 metadata를 초기화하는 일만 하지 바꾼걸 반영하는건 아니다. 백엔드가 바뀌었다 메타데이터 디비가 바뀌었다할 때 실행하는 명령어이다.
  1. Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?
    a. fernet_key

Airflow와 타임존

  • airflow.cfg에는 두 종류의 타임존 관련 키가 존재
    a. default_timezone : 언제 DAG가 실행되는지, 스케쥴이 이걸 따라감.
    b. default_ui_timezone

  • start_date, end_date, schedule
    a. default_timezone에 지정된 타임존을 따름

  • execution_date와 로그 시간
    a. 항상 UTC를 따름
    b. 즉 execution_date를 사용할 때는 타임존을 고려해서 변환 후 사용필요

현재로 가장 좋은 방법은 UTC를 일관되게 사용하는 것으로 보임. 섞어서 쓰면 혼란이 옴.

dags 폴더에서 코딩시 주의할 점

  • Airflow는 dags폴더를 주기적으로 스캔함
    [core]
    dags_folder = /var/lib/airflow/dags
    #How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
    dag_dir_list_interval = 300

  • 이 때 DAG 모듈이 들어있는 모든 파일들의 메인 함수가 실행이 됨

    • 이 경우 본의 아니게 개발 중인 테스트 코드도 실행될 수 있음
    from airflow import DAG
    ...
    cur.execute("DELETE FROM ....")

ContryInfo DAG 작성

  • https://restcountries.com/ 을 참고

    • 별도의 API Key가 필요없음
  • https://restcountries.com/v3/all 를 호출하여 나라별로 다양한 정보를 얻을 수 있다.

  • Full Refresh로 구현해서 매번 국가 정보를 읽어오게 할것

  • API 결과에서 아래 3개의 정보를 추출하여 Redshift에 스키마 밑에 테이블 생성

    • country -> ["name"]["official"]
    • population -> ["population"]
    • area -> ["area"]
  • 단 이 DAG는 UTC로 매주 토요일 오전 6시 30분에 실행되게 만들어볼 것

import requests 

@task
def extract_transform():
    url = f"https://restcountries.com/v3/all"
    response = requests.get(url)
    data = response.json()
    records = []

    for row in data:
        records.append([row["name"]["official"], row["population"], row["area"]])
    return records
    
@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
        cur.execute(f"""
CREATE TABLE {schema}.{table} (
    country varchar(255),
    population bigint,
    area float
);""")
        for r in records:
            sql = f"INSERT INTO {schema}.{table} VALUES (%s, %s, %s);"
            print(sql)
            cur.execute(sql, (r[0], r[1], r[2]))
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")
with DAG(
    dag_id = 'UpdateWorldAPI',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule_interval='30 6 * * 6', #0 - Sunday, ..., 6 - Saturday
) as dag:

    results = get_worlds_info()
    load("kyongjin1234", "world_info", results)

Open Weathermap API

  • 위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스

  • 무료 계정으로 api key를 받아서 이를 호출시에 사용

만들려는 DAG: 서울 8일 낮/최소/최대 온도 읽기

DAG 구현

  • Open Weathermap의 one call API를 사용해서 서울의 다음 7일간의 낮/최소/최대 온도를 읽어다가 각자 스키마 밑의 weather_forecast라는 테이블로 저장
    • https://openweathermap.org/api/one-call-api 를 호출해서 테이블을 채움
    • weather_forecast라는 테이블이 대상이 됨
      • 여기서 유의할 점은 created_date은 레코드 생성시간으로 자동 채워지는 필드라는 점
CREATE TABLE keeyong.weather_forecast (
  date date primary key,
  temp float, -- 낮 온도
  min_temp float,
  max_temp float,
  created_date timestamp default GETDATE()
);  
  • One-Call API는 결과를 JSON 형태로 리턴해줌

    • 이를 읽어드리려면 requests.get 결과의 text를 JSON으로 변환해 주어야함
    • 아니면 requests.get 결과 오브젝트가 제공해주는 .json()이란 함수 사용
      f = requests.get(link)
      f_js = f.json()
  • 결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어가 있음

    • daily 필드는 리스트이며 각 레코드가 하나의 날짜에 해당
    • 날짜 정보는 "dt"라는 필드에 들어있음. 이는 epoch라고 해서 1970년 1월 1일 이후 밀리세컨드로 시간을 표시. 이는 아래와 같은 코드로 읽을 수 있는 날짜로 변경 가능
      • datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') #2021-10-09
  • daily라는 리스트에 앞으로 7일간의 온도 정보가 들어옴

    • dt필드가 날짜를 나타냄
    • temp 필드가 온도 정보를 나타냄
      • day
      • min
      • max
      • night
      • eve
      • morn
  • Airflow Connections를 통해 만들어진 Redshift connection

    • 기본 autocommit의 값은 False인 점을 유의
  • 두 가지 방식의 Full Refresh 구현 방식

    • Full Refresh와 INSERT INTO를 사용
    • Full Refresh와 COPY를 사용 -> 나중에 사용해볼 예정

DAG 구현 : Full Refresh

  • API Key는 어디에 저장해야할까 ?
  • Full Refresh
    • 매번 테이블을 지우고 다시 빌드
  • DW상의 테이블은 아래처럼 정의
CREATE TABLE 각자스키마.weather_forecast (
  date date primary key,
  temp float,
  min_temp float,
  max_temp float,
  created_date timestamp default GETDATE()

소스코드

from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json


def get_Redshift_connection():
    # autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

@task
def etl(schema, table):
    api_key = Variable.get("open_weather_api_key")
    # 서울의 위도/경도
    lat = 37.5665
    lon = 126.9780

    # https://openweathermap.org/api/one-call-api
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)
    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);
"""
    insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
    logging.info(drop_recreate_sql)
    logging.info(insert_sql)
    try:
        cur.execute(drop_recreate_sql)
        cur.execute(insert_sql)
        cur.execute("Commit;")
    except Exception as e:
        cur.execute("Rollback;")
        raise

with DAG(
    dag_id = 'Weather_to_Redshift',
    start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    etl("keeyong", "weather_forecast")
profile
keep growing

0개의 댓글