
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import timedelta
from plugins.utils import FileManager, RequestTool
from plugins.s3 import S3Helper
import requests
import pandas as pd
import datetime
import os
import logging
@task
def extract(req_params: dict):
verify=False
result = []
start_date = datetime.datetime(2024,1,1).date()
end_date = datetime.datetime.today().date()
current_date = start_date
while current_date <= end_date:
date = current_date.strftime("%Y-%m-%d").replace('-','')
req_params = {
"KEY": Variable.get('api_key_seoul'),
"TYPE": 'json',
"SERVICE": 'tbLnOpendataRtmsV',
"START_INDEX": 1,
"END_INDEX": 1000,
"EXTRA": ' / / / / / / / / ',
"MSRDT_DE": date
}
try:
data = RequestTool.api_request(base_url, verify, req_params)
result.append([data, str(current_date)])
current_date += timedelta(days=1)
except:
current_date += timedelta(days=1)
pass
logging.info('Success : housing_extract')
return result
@task
def transform(responses):
result = []
for response in responses:
data = response[0]
date = response[1]
df = pd.DataFrame(data['tbLnOpendataRtmsV']['row'])
housing_data = df[['DEAL_YMD', 'SGG_NM', 'OBJ_AMT', 'BLDG_AREA', 'FLOOR', 'BUILD_YEAR', 'HOUSE_TYPE']]
result.append([housing_data, date])
logging.info('Success : housing_transform')
return result
@task
def upload(records):
for record in records:
data = record[0]
date = record[1]
file_name = f'{date}.csv'
file_path = f'temp/Seoul_housing/{file_name}'
FileManager.mkdir(file_path)
s3_key = key + str(file_name)
data.to_csv(file_path, header = False, index = False, encoding='utf-8-sig')
S3Helper.upload(aws_conn_id, bucket_name, s3_key, file_path, True)
FileManager.remove(file_path)
logging.info('Success : housing_load')
with DAG(
dag_id = 'Seoul_housing',
start_date = datetime.datetime(2024,1,1),
schedule = '@daily',
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
aws_conn_id='aws_default'
bucket_name = 'bucket_name'
key = 's3_key'
base_url = 'http://openAPI.seoul.go.kr:8088'
records = transform(extract(url))
upload(records)
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import timedelta
from plugins.utils import FileManager, RequestTool
from plugins.s3 import S3Helper
import requests
import pandas as pd
import datetime
import os
import logging
req_params = {
"KEY": Variable.get('api_key_seoul'),
"TYPE": 'json',
"SERVICE": 'SPOP_DAILYSUM_JACHI',
"START_INDEX": 1,
"END_INDEX": 1000,
"MSRDT_DE": execution_date.replace('-', '')
}
@task
def extract(req_params: dict):
verify=False
result = RequestTool.api_request(base_url, verify, req_params)
logging.info(f'Success : life_people_extract')
return result
@task
def transform(response):
data = response
try:
df = pd.DataFrame(data['SPOP_DAILYSUM_JACHI']['row'])
life_people_data = df[['STDR_DE_ID', 'SIGNGU_NM', 'TOT_LVPOP_CO']]
logging.info(f'Success : life_people_transform')
return life_people_data
except:
logging.error(f'no data found')
return None
@task
def load(record):
try:
data = record
file_name = f'{execution_date}.csv'
file_path = f'temp/Seoul_pop/{file_name}'
FileManager.mkdir(file_path)
data.to_csv(file_path, header = False, index = False, encoding='utf-8-sig')
logging.info(f'Success : life_people_load')
return file_path
except TypeError:
logging.error('no data found')
return None
@task
def upload(file):
try:
local_file = file
file_name = execution_date
s3_key = key + str(file_name)
S3Helper.upload(aws_conn_id, bucket_name, s3_key, local_file, True)
FileManager.remove(local_file)
logging.info(f'Success : life_people_upload ({file_name})')
except:
logging.error('no data found')
pass
with DAG(
dag_id = 'Seoul_Population',
start_date = datetime.datetime(2024,1,1),
schedule = '@daily',
max_active_runs = 1,
catchup = True,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
'execution_date': '{{ macros.ds_add(ds, -4) }}',
}
) as dag:
aws_conn_id='aws_default'
bucket_name = 'de-team5-s3-01'
key = 'raw_data/seoul_pop/'
base_url = 'http://openAPI.seoul.go.kr:8088'
records = transform(extract(url))
upload(load(records))
raw_data로 적재된 데이터들을 Data Cleaning하여 cleaned_data에 새로 적재하기 위한 코드 작성, 큰 틀은 잡았다.
내가 담당한 데이터에 대해 테스트 해봤다.
원본데이터
Data Cleaning 후