[프로그래머스] 데브코스 데이터엔지니어링 최종 프로젝트 1주차

주재민·2024년 2월 12일
0
post-thumbnail

0주차 프로젝트 주간 전(2024. 02. 08. ~ 09.)

프로젝트 아이디어 선정 회의 및 아이디어 구체화

행복 지수에 영향을 주는 요인들(https://www.kaggle.com/datasets/willianoliveiragibin/2024-urban-bliss-index/data)을 토대로 구별 행복 지수 시각화

그러나 행복 지수라는 개념이 너무 주관적 및 도출식을 알 수 없음. 행복지수 -> 구별 주택가격으로 선회

해당 프로젝트 주제를 위한 데이터 소스 탐색

2024. 02. 12. ~ 2024. 02 .16.

아키텍쳐 토의

  • 서버(US) :
    ▹ ec2 - Airflow
    ▹ ec2 - Airflow MetaDB
    ▹ ec2 - Spark

  • 스토리지(DL) : S3

  • DB(DW) : Redshift

  • 파이프라인 관리 : Airflow

프로젝트 방향성 토의(서울시 구단위 부동산 관련 통합 서비스)

  • 주택 가격 예측, 주택 거래 연결, 주택 가격 비교, 부동산 투자 판단 보조지표, 부동산 거래 플랫폼 통합 등에 이용할 수 있을 것으로 보임

기타

  • API 공공 데이터 이용을 위한 인증키 발급

커밋 브랜치 규칙

브랜치 종류 별 역할

  • main branch : 배포 가능한 상태만을 관리한다.
  • dev branch : 기능 개발을 위한 브랜치들을 병합하기 위해 사용한다. 모든 기능이 추가되고 버그가 수정되어 배포 가능한 안정적인 상태라면 해당 dev브랜치를 main 브랜치에 merge한다. 평소에는 이 브랜치를 기반으로 개발을 진행한다.
  • feat branch : 기능을 개발하는 브랜치로, 새로운 기능 개발 및 버그 수정이 필요할 때마다 dev 브랜치로부터 분기한다. 개발이 완료되면 dev 브랜치로 merge하여 팀원과 공유한다. 더 이상 필요하지 않은 feat 브랜치는 삭제합니다.
  • fix branch : 배포한 버전에 긴급하게 수정을 해야 할 필요가 있을 경우, main브랜치에서 분기한다. 문제가 되는 부분을 수정 후에 main 브랜치에 merge하고 배포한다. fix 브랜치에서의 변경 사항은 dev 브랜치에도 merge한다.

중심이 되는 main와 dev 브랜치를 제외한 나머지 feat, fix 브랜치는 merge되면 삭제하도록 한다.

명명 규칙

feat 브랜치는 feat/기능요약 형식을 사용하도록 한다.

fix 브랜치는 hotfix-버전 형식을 사용하도록 한다.

branch 이름 앞에 Jira에서 발행한 이슈 키를 붙이면 Jira와 자동으로 연동 (선택사항)

Commit 메세지 규칙

메세지 앞에 아래의 유형 중 하나를 골라 앞머리에 작성

메세지 앞에 Jira에서 발행한 이슈 키를 적으면 자동으로 연동 (선택)

  • feat - 새로운 기능 추가
  • fix - 버그나 코드 수정
  • docs - 문서 수정 (README.md, Issue Template, 라이센스 등)
  • style - 코드 포맷팅, 세미콜론 누락 등 코드의 변경이 없는 경우
  • refactor - 코드 리팩토링
  • test - 테스트 코드, 리팩토링 테스트 코드 추가
  • chore - 빌드 업무 수정, 패키지 매니저 수정 등 설정 변경 (.gitignore 포함)

데이터 수집

내가 맡아 수집할 정보 : 서울시 생활 인구, 부동산 관련 정보

ETL코드 작성을 어느정도 끝냈는데 저장하는 부분에서 막힘.
멘토님께 여쭤보니 Airlfow 가상환경에서 dag를 실행하고 local에서 저장하려고 한 것이 문제인 것 같다고 하심.
경로를 바꿔서 시도했으나 여전히 실패

Airflow를 재설치한 후 docker-compose 파일을 수정해서 해결했다. 정상적으로 저장되는 것을 확인.

S3 업로드가 안되는 문제가 발생, 전달받은 키페어에 오타가 있었고 해당 문제를 수정, 정상적으로 적재되는 것을 확인.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import timedelta
import requests
import pandas as pd
import datetime
import os


def life_people_extract(**context):
    urls = []
    link = context['params']['url']

    start_date = datetime.datetime(2024,1,1).date()
    end_date = datetime.datetime.today().date()

    current_date = start_date

    while current_date <= end_date:
        date = current_date.strftime("%Y-%m-%d").replace('-','')
        url = link+date

        urls.append([url, str(current_date)])
        current_date += timedelta(days=1)

    return urls

def housing_extract(**context):
    urls = []
    link = context['params']['url']

    start_date = datetime.datetime(2024,1,1).date()
    end_date = datetime.datetime.today().date()

    current_date = start_date

    while current_date <= end_date:
        date = current_date.strftime("%Y-%m-%d").replace('-','')
        url = link+date

        urls.append([url, str(current_date)])
        current_date += timedelta(days=1)

    return urls

def life_people_transform(**context):
    result = []
    responses = context["task_instance"].xcom_pull(key="return_value", task_ids="life_people_extract")

    for response in responses:
        res = requests.get(response[0])
        data = res.json()
        date = response[1]

        try:

            df = pd.DataFrame(data['SPOP_DAILYSUM_JACHI']['row'])

            life_people_data = df[['STDR_DE_ID', 'SIGNGU_NM', 'TOT_LVPOP_CO']]
            result.append([life_people_data, date])
        
        except:
            pass

    return result

def housing_transform(**context):
    result = []
    responses = context["task_instance"].xcom_pull(key="return_value", task_ids="housing_extract")

    for response in responses:
        res = requests.get(response[0])
        data = res.json()
        date = response[1]

        try:

            df = pd.DataFrame(data['tbLnOpendataRtmsV']['row'])

            housing_data = df[['DEAL_YMD', 'SGG_NM', 'OBJ_AMT', 'BLDG_AREA', 'FLOOR', 'BUILD_YEAR', 'HOUSE_TYPE']]
            result.append([housing_data, date])
        
        except:
            pass

    return result

def life_people_upload(**context):
    records = context["task_instance"].xcom_pull(key="return_value", task_ids="life_people_transform")
    s3_hook = S3Hook(aws_conn_id='aws_default')
    file_path = '/works'

    for record in records: 
    
        data = record[0]
        date = record[1]

        file_name = '{}.csv'.format(date)

        os.makedirs(file_path, exist_ok=True)
        local_file = os.path.join(file_path, file_name)

        pd.DataFrame(data).to_csv(local_file, index = False)

        s3_hook.load_file(file, key = 'key', bucket_name = 'de-team5-s3-01', replace = True)

        os.remove(file)

def housing_upload(**context):
    records = context["task_instance"].xcom_pull(key="return_value", task_ids="housing_transform")
    s3_hook = S3Hook(aws_conn_id='aws_default')
    file_path = '/works'

    for record in records: 
    
        data = record[0]
        date = record[1]

        file_name = '{}.csv'.format(date)

        os.makedirs(file_path, exist_ok=True)
        local_file = os.path.join(file_path, file_name)

        pd.DataFrame(data).to_csv(local_file, index = False)

        s3_hook.load_file(local_file, key = 'key', bucket_name = 'de-team5-s3-01', replace = True)

        os.remove(file)

dag = DAG(
    dag_id = 'testing',
    start_date = datetime.datetime(2024,1,1),
    schedule = '0 0 * * * *',
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

life_people_extract = PythonOperator(
    task_id = 'life_people_extract',
    python_callable = life_people_extract,
    params = {
        'url':  'http://openapi.seoul.go.kr:8088/api_key/json/SPOP_DAILYSUM_JACHI/1/1000/'
    },
    dag = dag)

housing_extract = PythonOperator(
    task_id = 'housing_extract',
    python_callable = housing_extract,
    params = {
        'url':  'http://openapi.seoul.go.kr:8088/api_key/json/tbLnOpendataRtmsV/1/1000/ / / / / / / / / /'
    },
    dag = dag)

life_people_transform = PythonOperator(
    task_id = 'life_people_transform',
    python_callable = life_people_transform,
    params = { 
    },  
    dag = dag)

housing_transform = PythonOperator(
    task_id = 'housing_transform',
    python_callable = housing_transform,
    params = { 
    },  
    dag = dag)

life_people_upload = PythonOperator(
    task_id = 'life_people_upload',
    python_callable = life_people_upload,
    params = { 
    },  
    dag = dag)

housing_upload = PythonOperator(
    task_id = 'housing_upload',
    python_callable = housing_upload,
    params = { 
    },  
    dag = dag)

life_people_extract >> life_people_transform >> life_people_upload
housing_extract >> housing_transform >> housing_upload

코드 수정

  • 인구 데이터와 부동산 데이터 etl 코드를 분리
  • 최신 업데이트 생활 인구 데이터는 5일 전 데이터이므로 executuion_date을 수정
  • 부동산 관련 정보는 업데이트되는 주기가 규칙적이지 않아 full-refresh 방식으로 업데이트 하도록 코드를 수정

0개의 댓글