
행복 지수에 영향을 주는 요인들(https://www.kaggle.com/datasets/willianoliveiragibin/2024-urban-bliss-index/data)을 토대로 구별 행복 지수 시각화
그러나 행복 지수라는 개념이 너무 주관적 및 도출식을 알 수 없음. 행복지수 -> 구별 주택가격으로 선회
해당 프로젝트 주제를 위한 데이터 소스 탐색
서버(US) :
▹ ec2 - Airflow
▹ ec2 - Airflow MetaDB
▹ ec2 - Spark
스토리지(DL) : S3
DB(DW) : Redshift
파이프라인 관리 : Airflow
중심이 되는 main와 dev 브랜치를 제외한 나머지 feat, fix 브랜치는 merge되면 삭제하도록 한다.
feat 브랜치는 feat/기능요약 형식을 사용하도록 한다.
fix 브랜치는 hotfix-버전 형식을 사용하도록 한다.
branch 이름 앞에 Jira에서 발행한 이슈 키를 붙이면 Jira와 자동으로 연동 (선택사항)
메세지 앞에 아래의 유형 중 하나를 골라 앞머리에 작성
메세지 앞에 Jira에서 발행한 이슈 키를 적으면 자동으로 연동 (선택)
내가 맡아 수집할 정보 : 서울시 생활 인구, 부동산 관련 정보
ETL코드 작성을 어느정도 끝냈는데 저장하는 부분에서 막힘.
멘토님께 여쭤보니 Airlfow 가상환경에서 dag를 실행하고 local에서 저장하려고 한 것이 문제인 것 같다고 하심.
경로를 바꿔서 시도했으나 여전히 실패
Airflow를 재설치한 후 docker-compose 파일을 수정해서 해결했다. 정상적으로 저장되는 것을 확인.
S3 업로드가 안되는 문제가 발생, 전달받은 키페어에 오타가 있었고 해당 문제를 수정, 정상적으로 적재되는 것을 확인.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import timedelta
import requests
import pandas as pd
import datetime
import os
def life_people_extract(**context):
urls = []
link = context['params']['url']
start_date = datetime.datetime(2024,1,1).date()
end_date = datetime.datetime.today().date()
current_date = start_date
while current_date <= end_date:
date = current_date.strftime("%Y-%m-%d").replace('-','')
url = link+date
urls.append([url, str(current_date)])
current_date += timedelta(days=1)
return urls
def housing_extract(**context):
urls = []
link = context['params']['url']
start_date = datetime.datetime(2024,1,1).date()
end_date = datetime.datetime.today().date()
current_date = start_date
while current_date <= end_date:
date = current_date.strftime("%Y-%m-%d").replace('-','')
url = link+date
urls.append([url, str(current_date)])
current_date += timedelta(days=1)
return urls
def life_people_transform(**context):
result = []
responses = context["task_instance"].xcom_pull(key="return_value", task_ids="life_people_extract")
for response in responses:
res = requests.get(response[0])
data = res.json()
date = response[1]
try:
df = pd.DataFrame(data['SPOP_DAILYSUM_JACHI']['row'])
life_people_data = df[['STDR_DE_ID', 'SIGNGU_NM', 'TOT_LVPOP_CO']]
result.append([life_people_data, date])
except:
pass
return result
def housing_transform(**context):
result = []
responses = context["task_instance"].xcom_pull(key="return_value", task_ids="housing_extract")
for response in responses:
res = requests.get(response[0])
data = res.json()
date = response[1]
try:
df = pd.DataFrame(data['tbLnOpendataRtmsV']['row'])
housing_data = df[['DEAL_YMD', 'SGG_NM', 'OBJ_AMT', 'BLDG_AREA', 'FLOOR', 'BUILD_YEAR', 'HOUSE_TYPE']]
result.append([housing_data, date])
except:
pass
return result
def life_people_upload(**context):
records = context["task_instance"].xcom_pull(key="return_value", task_ids="life_people_transform")
s3_hook = S3Hook(aws_conn_id='aws_default')
file_path = '/works'
for record in records:
data = record[0]
date = record[1]
file_name = '{}.csv'.format(date)
os.makedirs(file_path, exist_ok=True)
local_file = os.path.join(file_path, file_name)
pd.DataFrame(data).to_csv(local_file, index = False)
s3_hook.load_file(file, key = 'key', bucket_name = 'de-team5-s3-01', replace = True)
os.remove(file)
def housing_upload(**context):
records = context["task_instance"].xcom_pull(key="return_value", task_ids="housing_transform")
s3_hook = S3Hook(aws_conn_id='aws_default')
file_path = '/works'
for record in records:
data = record[0]
date = record[1]
file_name = '{}.csv'.format(date)
os.makedirs(file_path, exist_ok=True)
local_file = os.path.join(file_path, file_name)
pd.DataFrame(data).to_csv(local_file, index = False)
s3_hook.load_file(local_file, key = 'key', bucket_name = 'de-team5-s3-01', replace = True)
os.remove(file)
dag = DAG(
dag_id = 'testing',
start_date = datetime.datetime(2024,1,1),
schedule = '0 0 * * * *',
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
life_people_extract = PythonOperator(
task_id = 'life_people_extract',
python_callable = life_people_extract,
params = {
'url': 'http://openapi.seoul.go.kr:8088/api_key/json/SPOP_DAILYSUM_JACHI/1/1000/'
},
dag = dag)
housing_extract = PythonOperator(
task_id = 'housing_extract',
python_callable = housing_extract,
params = {
'url': 'http://openapi.seoul.go.kr:8088/api_key/json/tbLnOpendataRtmsV/1/1000/ / / / / / / / / /'
},
dag = dag)
life_people_transform = PythonOperator(
task_id = 'life_people_transform',
python_callable = life_people_transform,
params = {
},
dag = dag)
housing_transform = PythonOperator(
task_id = 'housing_transform',
python_callable = housing_transform,
params = {
},
dag = dag)
life_people_upload = PythonOperator(
task_id = 'life_people_upload',
python_callable = life_people_upload,
params = {
},
dag = dag)
housing_upload = PythonOperator(
task_id = 'housing_upload',
python_callable = housing_upload,
params = {
},
dag = dag)
life_people_extract >> life_people_transform >> life_people_upload
housing_extract >> housing_transform >> housing_upload
executuion_date을 수정