[프로그래머스] 데브코스 데이터엔지니어링 최종 프로젝트 2주차

주재민·2024년 2월 19일
0
post-thumbnail

Code Refactoring

  • dag를 작성하는 방식이 팀원마다 모두 달라 추후에 코드 유지보수, 재사용성, 확장성 관점에서 기술부채를 유발할 가능성 제기, dag 작성 방식의 통일화
  • plugin, utils를 활용하도록 변경

Seoul_housing

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import timedelta
from plugins.utils import FileManager, RequestTool
from plugins.s3 import S3Helper

import requests
import pandas as pd
import datetime
import os
import logging


@task
def extract(req_params: dict):
    verify=False
    result = []

    start_date = datetime.datetime(2024,1,1).date()
    end_date = datetime.datetime.today().date()
    current_date = start_date

    while current_date <= end_date:
        date = current_date.strftime("%Y-%m-%d").replace('-','')

        req_params = {
            "KEY": Variable.get('api_key_seoul'),
            "TYPE": 'json',
            "SERVICE": 'tbLnOpendataRtmsV',
            "START_INDEX": 1,
            "END_INDEX": 1000,
            "EXTRA": ' / / / / / / / / ',
            "MSRDT_DE": date
            }
        
        try:
        
            data = RequestTool.api_request(base_url, verify, req_params)

            result.append([data, str(current_date)])
            current_date += timedelta(days=1)

        except:
            
            current_date += timedelta(days=1)
            pass
            

    logging.info('Success : housing_extract')

    return result

@task
def transform(responses):
    result = []

    for response in responses:

        data = response[0]
        date = response[1]

        df = pd.DataFrame(data['tbLnOpendataRtmsV']['row'])

        housing_data = df[['DEAL_YMD', 'SGG_NM', 'OBJ_AMT', 'BLDG_AREA', 'FLOOR', 'BUILD_YEAR', 'HOUSE_TYPE']]
        result.append([housing_data, date])


    logging.info('Success : housing_transform')
        
    return result

@task
def upload(records):

    for record in records:
        data = record[0]
        date = record[1]

        file_name = f'{date}.csv'

        file_path = f'temp/Seoul_housing/{file_name}'
        FileManager.mkdir(file_path)

        s3_key = key + str(file_name)

        data.to_csv(file_path, header = False, index = False, encoding='utf-8-sig')
        S3Helper.upload(aws_conn_id, bucket_name, s3_key, file_path, True)

        FileManager.remove(file_path)

        logging.info('Success : housing_load')

with DAG(
    dag_id = 'Seoul_housing',
    start_date = datetime.datetime(2024,1,1),
    schedule = '@daily',
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:
    aws_conn_id='aws_default'
    bucket_name = 'bucket_name'
    key = 's3_key'
    base_url = 'http://openAPI.seoul.go.kr:8088'


    records = transform(extract(url))

    upload(records)

Seoul_POP

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import timedelta
from plugins.utils import FileManager, RequestTool
from plugins.s3 import S3Helper

import requests
import pandas as pd
import datetime
import os
import logging

req_params = {
    "KEY": Variable.get('api_key_seoul'),
    "TYPE": 'json',
    "SERVICE": 'SPOP_DAILYSUM_JACHI',
    "START_INDEX": 1,
    "END_INDEX": 1000,
    "MSRDT_DE": execution_date.replace('-', '')
    }

@task
def extract(req_params: dict):
    verify=False
    result = RequestTool.api_request(base_url, verify, req_params)

    logging.info(f'Success : life_people_extract')

    return result

@task
def transform(response):

    data = response
    try:
        df = pd.DataFrame(data['SPOP_DAILYSUM_JACHI']['row'])

        life_people_data = df[['STDR_DE_ID', 'SIGNGU_NM', 'TOT_LVPOP_CO']]

        logging.info(f'Success : life_people_transform')

        return life_people_data
    
    except:

        logging.error(f'no data found')

        return None

@task
def load(record):

    try:
        data = record

        file_name = f'{execution_date}.csv'
        file_path = f'temp/Seoul_pop/{file_name}'
        
        FileManager.mkdir(file_path)

        data.to_csv(file_path, header = False, index = False, encoding='utf-8-sig')

        logging.info(f'Success : life_people_load')

        return file_path
    
    except TypeError:
        logging.error('no data found')
        return None

    
@task
def upload(file):
    
    try:
        local_file = file
        file_name = execution_date

        s3_key = key + str(file_name)

        S3Helper.upload(aws_conn_id, bucket_name, s3_key, local_file, True)

        FileManager.remove(local_file)

        logging.info(f'Success : life_people_upload ({file_name})')
    
    except:
        logging.error('no data found')
        pass


with DAG(
    dag_id = 'Seoul_Population',
    start_date = datetime.datetime(2024,1,1),
    schedule = '@daily',
    max_active_runs = 1,
    catchup = True,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        'execution_date': '{{  macros.ds_add(ds, -4) }}',
    }
) as dag:
    aws_conn_id='aws_default'
    bucket_name = 'de-team5-s3-01'
    key = 'raw_data/seoul_pop/'
    base_url = 'http://openAPI.seoul.go.kr:8088'

    records = transform(extract(url))

    upload(load(records))

Data Cleaning

raw_data로 적재된 데이터들을 Data Cleaning하여 cleaned_data에 새로 적재하기 위한 코드 작성, 큰 틀은 잡았다.

내가 담당한 데이터에 대해 테스트 해봤다.

                                                            원본데이터


                                                            Data Cleaning 후

0개의 댓글