데브코스 43일차(3) - 과제2) 세계 나라 정보 API를 사용한 DAG 작성

Pori·2023년 12월 13일
0

데엔

목록 보기
36/47

DAG 작성에 필요한 정보들

  • end-point : https://restcountries.com/v3/all
  • Full Refresh로 구현해서 매번 국가 정보를 읽어오게 하기
  • Redshift에 테이블 생성
    • country -> [“name”][“official”]
    • population -> [“population”]
    • area -> [“area”]
  • 이 DAG는 UTC로 매주 토요일 오전 6시 30분에 실행되게 만들어볼 것

DAG Code

  • Redshift coneection : airflow의 webUI를 활용하여 connection을 미리 연결해두었다.
# Redshift Connection
from airflow.providers.postgres.hooks.postgres import PostgresHook

def get_Redshift_connection(autocommit = True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()
  • Extarct,Transform
    : 변환작업이 많지않아 한번에 수행하였다, Json데이터를 쉽게 다루도록 해주는 .json()을 활용해서 진행하였다.
@task
def get_country_info(url):
    # requests
    data = requests.get(url)
    records = []
    for row in data.json():
        records.append([row["name"]["official"], row["population"], row["area"]])
    return records
  • Load
    : 가장 에러가 많이 발생했던 메서드이다. 아래는 에러가 주로 발생했던 부분들이다.
    1. cur에 위에서 생성한 conn.cursor() 를 넣어주어야한다.
    2. country varchar(150) : 길이가 official이기 때문에 매우 길다.
    3. cur.execute(sql, (r[0], r[1], r[2])) : 값들 중에 's로 끝나는 단어에서 에러가 많이 발생하였다. 원래는 sql에 값들을 모두 넣고 진행하였으나, GPT의 추천을 받아 다음과 같이 작성하였고 해결하였다.
@task
def load(schema, table, records):
    # load start
    logging.info("load started")

    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
        cur.execute(f"""
        CREATE TABLE {schema}.{table}(
            country varchar(150),
            population bigint,
            area varchar(150)
        )
        ;""")
        for r in records:
            sql = f"INSERT INTO {schema}.{table} VALUES (%s, %s::bigint, %s);"
            print(sql, (r[0], r[1], r[2]))
            cur.execute(sql, (r[0], r[1], r[2]))
        cur.execute("COMMIT;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise
    
    logging.info("load done")
  • DAG 정의 및 함수 호출 : schedule에 맞추어 실행되도록 하였다.
with DAG(
    dag_id='CountryInfo',
    start_date=datetime(2023, 12, 13),
    schedule='30 6 * * Sat',
    max_active_runs=1,
    catchup=False,
    default_args={
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
) as dag:
    url = Variable.get("country_url")
    country_data=get_country_info(url)
    load("areacmzl","country_info",country_data)
  • 결과 : 정상적으로 값들이 입력되었다!

에러 해결

  • VSCODE로 WSL의 환경을 불러오다가 permission Error가 발생
    : root계정으로 접근하지 않은 점이 문제였다. root로 접속하거나,
    sudo chown -R <계정명> <작업폴더> 권한을 변경해주었다.

  • airflow dags test <DAG_id> <time>을 이용하여 test 진행 시에 DAG_id를 DAG파일의 이름으로 착각하여 에러가 발생하였다.
    : DAG_id를 파일명과 동일하게 수정하였다.

참고

0개의 댓글