[프로그래머스] 데브코스 데이터엔지니어링 최종 프로젝트 3주차

주재민·2024년 2월 26일
0
post-thumbnail

Data Cleaning

작업이 생각보다 더 오래걸린다.

기능 자체는 구현이 완료 되었고 소스도 규격도 다른 데이터들을 공통적으로 적용할 수 있도록 코드를 작성하고 모듈화 시켰다.

filter.py

from pydantic import BaseModel, validator
from datetime import date, datetime
import pandas as pd

class housing(BaseModel):
    계약일: date
    자치구: str
    가격: int
    면적: float
    층수: int
    건축년도: int
    건물용도: str

    @classmethod
    def from_dataframe_row(cls, row):
        # 건축년도 결측치 중 null이라는 문자열로 처리 된 것들도 있고 비워져 있는 것도 있어서 에러가 나는 듯 한데 아래 코드 추가하니까 괜찮아졌습니다.
        row['건축년도'] = row['건축년도'] if pd.notna(row['건축년도']) else 0
        return cls(**row)
    
    @validator('계약일', pre=True, always=True)
    def parse_date(cls, value):
        # 정수로 받은 날짜를 datetime.date 객체로 변환
        if isinstance(value, int):
            value = str(value)
        return date(int(value[:4]), int(value[4:6]), int(value[6:]))
    
    @validator('자치구', '건물용도')
    def handle_string_column(cls, value):
        # 문자열로 된 컬럼의 결측치를 'NULL'로 처리
        return 'NULL' if pd.isna(value) else value

    @validator('가격', '면적', '층수', '건축년도')
    def handle_numeric_columns(cls, value):
        # 실수나 정수형으로 된 컬럼의 결측치를 0으로 처리
        return 0 if pd.isna(value) else value

(모든 코드 중 일부분)

cleaning.py

from airflow.plugins_manager import AirflowPlugin
from airflow.models import Variable
from io import StringIO
from plugins import filter

import pandas as pd
import boto3

s3_client = boto3.client('s3', aws_access_key_id=Variable.get("aws_access_key_id"),
                    aws_secret_access_key=Variable.get("aws_secret_access_key"))

class Cleaning(AirflowPlugin):

    def read_csv_to_df(subject: str, file, column_indexes: list):
        response = s3_client.get_object(Bucket="de-team5-s3-01", Key=f'raw_data/seoul_{subject}/{file}.csv')
        csv_content = response['Body'].read().decode('utf-8')
        df = pd.read_csv(StringIO(csv_content), header=None, usecols=column_indexes)
            
        return df

    
    def rename_cols(df: pd.DataFrame, subject: str):
        if subject == 'air' or subject == 'noise':
            df.drop(index=0, axis=0, inplace=True)

        column_names = filter.columns[subject]
        df.columns = column_names


        return df
    
    def check_pk_validation(df: pd.DataFrame, pk: str):
        df = df.dropna(subset=[pk])

        return df
    
    def unify_null(df: pd.DataFrame):
        df.replace('null', pd.NA, inplace=True)
        df.replace('-', pd.NA, inplace=True)

        return df
    
    def filter(df: pd.DataFrame, subject):
        sub = getattr(filter, subject)
        models = [sub.from_dataframe_row(row) for _, row in df.iterrows()]
        result_df = pd.DataFrame([model.dict() for model in models])

        return result_df

이후에 다시 S3의 cleaned_data에 적재하는 ETL코드를 작성했다.

housing_cleaning

from plugins.cleaning import Cleaning
from airflow import DAG
from datetime import timedelta
from airflow.decorators import task
from plugins import filter
from plugins.utils import FileManager
from plugins.s3 import S3Helper

import datetime


@task
def cleaning(**context):
    try:
        execution_date = context['execution_date'].date()
        data = Cleaning.read_csv_to_df('housing', execution_date, filter.column_indexes['housing'])
        data = Cleaning.check_pk_validation(Cleaning.rename_cols(data, 'housing'), '자치구' if '자치구' in filter.columns['housing'] else '권역')
        result_data = Cleaning.unify_null(data)

        result_data = Cleaning.filter(result_data, 'housing')

        print(data)

        file_path = f'/works/'
        file_name = '{}.csv'.format(execution_date)
        local = file_path+file_name

        s3_key = 'cleaned_data/seoul_housing/' + file_name

        result_data.to_csv(local, header = False, index = False, encoding='utf-8-sig')

        S3Helper.upload(aws_conn_id, bucket_name, s3_key, local, True)

        FileManager.remove(local)
    
    except:
        pass

with DAG(
    dag_id = 'Housing_Cleaning',
    start_date = datetime.datetime(2024,1,1),
    schedule = '@daily',
    max_active_runs = 1,
    catchup = True,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
    }
) as dag:
    aws_conn_id='aws_default'
    bucket_name = 'de-team5-s3-01'

    cleaning()

위의 작업을 부동산, 인구, 소음, 도로상황, 대기상황에 대해 진행했다.
업데이트가 필요없는 정적인 데이터들은 로컬에서 수작업으로 cleaning한 후에 적재했다.

cleaning 이 후 결측치 처리가 제대로 되지 않는 issue가 발생, 여러가지 결측치 형태들을 하나의 형태로 변경 후 일괄 처리하려 했으나 해당 부분에 issue가 있는 것으로 판단

본래 None으로 치환한 후 일괄 처리하려 했으나 인식하지 못하는 것으로 판단되어 np.nan으로 치환하도록 코드를 변경

문자열 컬럼에서 np.nan으로 치환 후 결측치 처리 시 유효성 검사에서 nan을 문자열로 인식하는 issue 발생, isna를 통한 결측치 처리가 아니라 문자열 nan이 들어올 시 처리하도록 코드를 수정함, 정상 작동을 확인

csv로 적재 후 쿼리작업 진행 시 속도가 느리다는 issue 제기, csv가 아닌 parquet 형식으로 파일을 저장하도록 변경

Data Dependency

raw_data 적재 후 바로 Data Cleaning이 실행되도록 ExternaltaskSensor 이용해 raw_data 적재 Dag가 완료되면 Data Cleaning Dag가 실행되도록 코드 작성

cleaning하는 코드에 아래와 같은 코드를 추가

sensor = ExternalTaskSensor(
        task_id='externaltasksensor',
        external_dag_id='etl_seoul_housing',
        external_task_id='upload',
        timeout=5*60,
        mode='reschedule',
        allowed_states=['success'],
        dag=dag
)

3개의 댓글

comment-user-thumbnail
2024년 2월 29일

안녕하세요! 데이터엔지니어 데브코스 지원을 고민하고 있는데, 5개월을 쓸 만큼 가치가 있었는지 궁금합니다!

1개의 답글