
작업이 생각보다 더 오래걸린다.
기능 자체는 구현이 완료 되었고 소스도 규격도 다른 데이터들을 공통적으로 적용할 수 있도록 코드를 작성하고 모듈화 시켰다.
from pydantic import BaseModel, validator
from datetime import date, datetime
import pandas as pd
class housing(BaseModel):
계약일: date
자치구: str
가격: int
면적: float
층수: int
건축년도: int
건물용도: str
@classmethod
def from_dataframe_row(cls, row):
# 건축년도 결측치 중 null이라는 문자열로 처리 된 것들도 있고 비워져 있는 것도 있어서 에러가 나는 듯 한데 아래 코드 추가하니까 괜찮아졌습니다.
row['건축년도'] = row['건축년도'] if pd.notna(row['건축년도']) else 0
return cls(**row)
@validator('계약일', pre=True, always=True)
def parse_date(cls, value):
# 정수로 받은 날짜를 datetime.date 객체로 변환
if isinstance(value, int):
value = str(value)
return date(int(value[:4]), int(value[4:6]), int(value[6:]))
@validator('자치구', '건물용도')
def handle_string_column(cls, value):
# 문자열로 된 컬럼의 결측치를 'NULL'로 처리
return 'NULL' if pd.isna(value) else value
@validator('가격', '면적', '층수', '건축년도')
def handle_numeric_columns(cls, value):
# 실수나 정수형으로 된 컬럼의 결측치를 0으로 처리
return 0 if pd.isna(value) else value
(모든 코드 중 일부분)
from airflow.plugins_manager import AirflowPlugin
from airflow.models import Variable
from io import StringIO
from plugins import filter
import pandas as pd
import boto3
s3_client = boto3.client('s3', aws_access_key_id=Variable.get("aws_access_key_id"),
aws_secret_access_key=Variable.get("aws_secret_access_key"))
class Cleaning(AirflowPlugin):
def read_csv_to_df(subject: str, file, column_indexes: list):
response = s3_client.get_object(Bucket="de-team5-s3-01", Key=f'raw_data/seoul_{subject}/{file}.csv')
csv_content = response['Body'].read().decode('utf-8')
df = pd.read_csv(StringIO(csv_content), header=None, usecols=column_indexes)
return df
def rename_cols(df: pd.DataFrame, subject: str):
if subject == 'air' or subject == 'noise':
df.drop(index=0, axis=0, inplace=True)
column_names = filter.columns[subject]
df.columns = column_names
return df
def check_pk_validation(df: pd.DataFrame, pk: str):
df = df.dropna(subset=[pk])
return df
def unify_null(df: pd.DataFrame):
df.replace('null', pd.NA, inplace=True)
df.replace('-', pd.NA, inplace=True)
return df
def filter(df: pd.DataFrame, subject):
sub = getattr(filter, subject)
models = [sub.from_dataframe_row(row) for _, row in df.iterrows()]
result_df = pd.DataFrame([model.dict() for model in models])
return result_df
이후에 다시 S3의 cleaned_data에 적재하는 ETL코드를 작성했다.
from plugins.cleaning import Cleaning
from airflow import DAG
from datetime import timedelta
from airflow.decorators import task
from plugins import filter
from plugins.utils import FileManager
from plugins.s3 import S3Helper
import datetime
@task
def cleaning(**context):
try:
execution_date = context['execution_date'].date()
data = Cleaning.read_csv_to_df('housing', execution_date, filter.column_indexes['housing'])
data = Cleaning.check_pk_validation(Cleaning.rename_cols(data, 'housing'), '자치구' if '자치구' in filter.columns['housing'] else '권역')
result_data = Cleaning.unify_null(data)
result_data = Cleaning.filter(result_data, 'housing')
print(data)
file_path = f'/works/'
file_name = '{}.csv'.format(execution_date)
local = file_path+file_name
s3_key = 'cleaned_data/seoul_housing/' + file_name
result_data.to_csv(local, header = False, index = False, encoding='utf-8-sig')
S3Helper.upload(aws_conn_id, bucket_name, s3_key, local, True)
FileManager.remove(local)
except:
pass
with DAG(
dag_id = 'Housing_Cleaning',
start_date = datetime.datetime(2024,1,1),
schedule = '@daily',
max_active_runs = 1,
catchup = True,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
) as dag:
aws_conn_id='aws_default'
bucket_name = 'de-team5-s3-01'
cleaning()
위의 작업을 부동산, 인구, 소음, 도로상황, 대기상황에 대해 진행했다.
업데이트가 필요없는 정적인 데이터들은 로컬에서 수작업으로 cleaning한 후에 적재했다.
cleaning 이 후 결측치 처리가 제대로 되지 않는 issue가 발생, 여러가지 결측치 형태들을 하나의 형태로 변경 후 일괄 처리하려 했으나 해당 부분에 issue가 있는 것으로 판단
본래 None으로 치환한 후 일괄 처리하려 했으나 인식하지 못하는 것으로 판단되어 np.nan으로 치환하도록 코드를 변경
문자열 컬럼에서 np.nan으로 치환 후 결측치 처리 시 유효성 검사에서 nan을 문자열로 인식하는 issue 발생, isna를 통한 결측치 처리가 아니라 문자열 nan이 들어올 시 처리하도록 코드를 수정함, 정상 작동을 확인
csv로 적재 후 쿼리작업 진행 시 속도가 느리다는 issue 제기, csv가 아닌 parquet 형식으로 파일을 저장하도록 변경
raw_data 적재 후 바로 Data Cleaning이 실행되도록 ExternaltaskSensor 이용해 raw_data 적재 Dag가 완료되면 Data Cleaning Dag가 실행되도록 코드 작성
cleaning하는 코드에 아래와 같은 코드를 추가
sensor = ExternalTaskSensor(
task_id='externaltasksensor',
external_dag_id='etl_seoul_housing',
external_task_id='upload',
timeout=5*60,
mode='reschedule',
allowed_states=['success'],
dag=dag
)
안녕하세요! 데이터엔지니어 데브코스 지원을 고민하고 있는데, 5개월을 쓸 만큼 가치가 있었는지 궁금합니다!