종합설계 프로젝트 - 2. 1 데이터 적재

Park·2023년 6월 11일
0

종합설계

목록 보기
3/3

데이터 적재

  • 본 파트에서는 인허가데이터, 버스데이터, 지하철데이터를 얻기 위해, 각각의 공공데이터 API서버에 호출하고 응답값을 받는 것까지 모듈화한 것을 보여준다

0. 클래스 추상화

  • 인허가, 버스, 지하철 모두 공통적으로 사용하는 메소드를 인터페이스화 해주었다
  • 파이썬에서도 ABC클래스를 통해 추상화 클래스를 구현할 수 있음
  • request_api()는 지정한 API 서버로부터 요청하여 결과를 반환하는 메소드
  • make_dataframe()은 반환받은 데이터를 가공하기 쉽게 pandas의 dataframe으로 변환하는 코드
  • get_apidata()는 정해진 요청 정보(page_size, return type 등 api에게 요청할 정보)를 입력으로 받은 후 request_api(), make_dataframe()을 반복적으로 호출하여 최종적으로 요청한 데이터를 반환받는 메소드이다
from abc import *

class RequestData(metaclass=ABCMeta):
    
    @abstractmethod
    def request_api(self, params: Dict) -> Dict:
        """
        Get json from API server
        """
        pass

    @abstractmethod
    def get_apidata(self, info: Dict) -> pd.DataFrame:
    	"""
        Every process (request API server and return data)
        """
        pass

    @abstractmethod
    def make_dataframe(self, response_dict: Dict) -> pd.DataFrame:
    	"""
        Dict from individual response(dict converted in python) to DataFrame
        """
        pass

1. 인허가데이터 수집 파트

class RequestLocalData(RequestData):

    url = "http://www.localdata.go.kr/platform/rest/TO0/openDataApi"

    def __init__(self, auth_key: str, *args: tuple):
        self.auth_key = auth_key

        if len(args)<2:
            self.start_date = datetime.datetime.now(timezone('Asia/Seoul')).strftime("%Y%m%d")
            self.end_date = datetime.datetime.now(timezone('Asia/Seoul')).strftime("%Y%m%d")
        else:
            self.start_date = args[0]
            self.end_date = args[1]
    
	"""
    (중략)
    """
    
    def request_api(self, params: Dict) -> Dict:
        """
        Get json data from Localdata API server
        """

        # Request Localdata API server and get response
        response = requests.get(RequestLocalData.url, params=params, verify=False)
        response_text = response.text

        # text to json
        response_dict = json.loads(response_text)
        return response_dict
    
    def get_apidata(self, info: Dict) -> pd.DataFrame:
        
        page_index = info['pageIndex']
        page_size = info['pageSize']
        opnSvcId = info['opnSvcId']

        # Request API server
        response_dict = self.request_api(params=info)
        
        # Count total to get info how many data are to be changed
        try:
            total_data_count = response_dict['result']['header']['paging']['totalCount']
            if total_data_count < 1:
                raise EmptyDataFromResponse
        except Exception as e:
            print(f"{opnSvcId}서비스에는 {self.start_date}부터 {self.end_date}까지의 데이터가 없습니다")
            return None
        
        iter_count = (total_data_count // page_size)
        

        # Make fundamental dataFrame
        response_dataframe = self.make_dataframe(response_dict)
        
        # Iteration for full data
        for _ in range(iter_count):
            page_index += 1
            info['pageIndex'] = page_index

            response_dict = self.request_api(params=info)
            tmp_dataframe = self.make_dataframe(response_dict)
            
            # data concat
            response_dataframe = pd.concat([response_dataframe, tmp_dataframe])
            time.sleep(1)
        
        # Reset index after concatenation
        response_dataframe.reset_index(drop=True, inplace=True)
        print(f"{opnSvcId}서비스의 {self.start_date}부터 {self.end_date}의 데이터를 성공적으로 다운받았습니다")
        return response_dataframe
    
    def make_dataframe(self, response_dict: Dict) -> pd.DataFrame:
        df = pd.json_normalize(response_dict['result']['body']['rows'][0]['row'])
        return df

1.1 get_apidata()

  • get_apidata()서비스 단위로 데이터를 요청해서 반환받는 함수이다.

  • 즉, 내가 병원 데이터가 필요하면, 앞서 언급한 편의시설 정의에 따라 병원(01_01_01_P), 의원(01_01_02_P), 부속의료기관(01_01_03_P) 서비스를 각각 get_apidata() 메소드를 3번 호출하여 각각 나온 데이터들을 하나로 합치면 된다

  • 여기서 get_apidata()가 인자로 받는 값은 다음과 같은 정보이다 (API 요청변수 참고)

{'authKey': 사용자 키,
 'resultType': 반환 타입,
 'lastModTsBgn' : 시작 요청일,
 'lastModTsEnd' : 종료 요청일,
 'pageIndex' : 1,
 'pageSize': 500, 
 'opnSvcId': 서비스 아이디(ex.병원 등)}
  • 이렇게 필요한 정보를 info 파라미터를 통해 넘겨주면, 우선 한 번 호출하여 조회할 날짜에 얼마나 데이터가 있는지 조회한다(total_data_count)
    • 만약 total_data_count가 0이라면 바로 종료한다
    • 그렇지 않다면 iter_count라는 변수를 활용해 앞으로 얼마나 반복해야 하는지 파악한다.
    • 인허가데이터에는 한 번 호출할 때, 최대로 결과값을 return받을 수 있는 pageSize가 최대 500번이기 때문에, 이렇게 나누어서 호출해줘야 한다.
    • 이런식으로 각각 한 번 호출할 때마다 데이터를 concat해준 후, 하나의 서비스에 대한 일일 데이터가 모두 수집되었다면 DataFrame을 반환한다.

1.2 request_api(), make_dataframe()

  • request_api()은 한 번 API서버에 요청할 때 쓰이는 메소드이다.
    • 결과값을 json 라이브러리의 json.loads()를 통해 python에서 손쉽게 처리할 수 있는 딕셔너리 형태로 변환해 준다
  • make_dataframe()은 변환한 딕셔너리를 판다스의 dataframe으로 변환해주는 메소드이다.
    • 데이터가 존재한다면, response_dict['result']['body']['rows'][0]['row']을 통해서 응답 데이터를 딕셔너리에서 추출할 수 있음
    • 추가적으로 json형태의 파일을 한 번에 DataFrame꼴로 변환시키는 pd.json_normalize() 메소드를 통해서 변환시킨다

2. 버스데이터 수집 파트

  • 전국 버스데이터를 한 번에 지원하는 API는 존재하지 않아, 서울 버스데이터(RequestSeoulBusData()), 서울 이외 지역 버스데이터(RequestOtherBusData()) 메소드를 각각 호출한 후 합쳐주는 작업을 한다.

2.1 서울 버스데이터 수집

class RequestSeoulBusData(RequestData):
    seoul_bus_url = 'http://openapi.seoul.go.kr:8088/'

    def __init__(self, auth_key) -> None:
        self.auth_key = auth_key


    # 서울 버스데이터는 start, end index로 전체 데이터 접근하는 방식
    def request_api(self, params: Dict) -> Dict:
        
        key         = params['key']
        type        = params['type']
        service     = params['service']
        start_index = params['start_index']
        end_index   = params['end_index']


        url = self.seoul_bus_url + f'{key}/{type}/{service}/{start_index}/{end_index}'

        response = requests.get(url, verify=False)
        response_text = response.text
        response_dict = json.loads(response_text)

        return response_dict
    
    def get_apidata(self, info: Dict) -> pd.DataFrame:
        # info for request
        start_index = info['start_index']
        end_index = info['end_index']
        page_size = info['page_size']

        # Request
        response_dict = self.request_api(params=info)
        
        # Count total to get info how many data are to be changed
        total_data_count = response_dict['busStopLocationXyInfo']['list_total_count']
        iter_count = (total_data_count // page_size)

        # Make fundamental dataFrame
        response_dataframe = self.make_dataframe(response_dict)
        
        # Iteration for full data
        for _ in range(iter_count):
            
            # Renew index 
            start_index += page_size
            end_index += page_size

            info['start_index'] = start_index
            info['end_index'] = end_index

            # Request Seoul Bus API
            response_dict = self.request_api(params=info)
            tmp_dataframe = self.make_dataframe(response_dict)
            
            # Data concat
            response_dataframe = pd.concat([response_dataframe, tmp_dataframe])
            time.sleep(1)
        
        # reset index after concatenation
        response_dataframe.reset_index(drop=True, inplace=True)

        # 서울 코드 붙이기
        response_dataframe['CityID'] = 11
        response_dataframe['CityName'] = '서울특별시'

        print('서울특별시 완료')
        return response_dataframe
    
    def make_dataframe(self, response_dict: dict) -> pd.DataFrame:
        df = pd.json_normalize(response_dict['busStopLocationXyInfo']['row'])
        return df
  • 서울 버스데이터는 인허가데이터와 비슷한 원리로 데이터를 추출한다.
  • 중요한 점은 데이터를 다 수집하고 나서,
response_dataframe['CityID'] = 11
response_dataframe['CityName'] = '서울특별시'

이렇게 도시 코드를 붙여준다. 나중에 서울 이외 지역 버스데이터와 합칠 때 용이하게 하기 위함이다.

2.2 서울 이외 버스데이터 수집

[코드]

class RequestOtherBusData(RequestData):
    other_bus_url = 'https://apis.data.go.kr/1613000/BusSttnInfoInqireService/getSttnNoList'
    other_possible_list_url = 'http://apis.data.go.kr/1613000/BusSttnInfoInqireService/getCtyCodeList'

    def __init__(self, auth_key) -> None:
        self.auth_key = auth_key

    def request_api(self, params: Dict) -> Dict:

        url = self.other_bus_url

        response = requests.get(url, params=params, verify=False)
        response_text = response.text
        response_dict = json.loads(response_text)

        return response_dict       

    def get_apidata(self, info: Dict) -> pd.DataFrame:
        
        # 가능한 city code 조회
        possible_city_df = self.search_possible_city()

        # Make base dataframe to concat
        code, city = possible_city_df.loc[0]
        info['cityCode'] = code

        all_dataframe = self.get_city_data(city=city, info=info)

        print(f"{city} 완료")
        
        # Traverse all city to get bus data
        for _, row in possible_city_df.loc[1:].iterrows():
            code, city = row['citycode'], row['cityname']
            
            # pageNo 항상 초기화
            info['pageNo'] = 1
            info['cityCode'] = code

            try:
                tmp_dataframe = self.get_city_data(city=city, info=info)
                all_dataframe = pd.concat([all_dataframe, tmp_dataframe])
                print(f"{city} 완료")

            except Exception as e:
                print(e)
                print(f"{city} 도시는 데이터가 없습니다")

            time.sleep(1)
        
        all_dataframe.reset_index(drop=True, inplace=True)

        return all_dataframe
        
    def make_dataframe(self, response_dict: dict) -> pd.DataFrame:
        df = pd.json_normalize(response_dict['response']['body']['items']['item'])
        return df
    
    def get_city_data(self, city: str, info: Dict) -> pd.DataFrame:

        page_no = info['pageNo']
        page_size = info['numOfRows']
        code = info['cityCode']

        # Request
        response_dict = self.request_api(params=info)
        
        # Count total to get info how many data are to be changed
        total_data_count = response_dict['response']['body']['totalCount']
        iter_count = (total_data_count // page_size)

        # Make fundamental dataFrame
        response_dataframe = self.make_dataframe(response_dict)
    
        # Iteration for full data
        for _ in range(iter_count):
            page_no += 1
            info['pageNo'] = page_no

            response_dict = self.request_api(params=info)
            tmp_dataframe = self.make_dataframe(response_dict)
            
            # data concat
            response_dataframe = pd.concat([response_dataframe, tmp_dataframe])
            time.sleep(1)
    
        # reset index after concatenation
        response_dataframe.reset_index(drop=True, inplace=True)

        # Add city id and city name
        response_dataframe['CityID'] = code
        response_dataframe['CityName'] = city

        return response_dataframe
    

    def search_possible_city(self) -> pd.DataFrame:
        
        url = self.other_possible_list_url
        params={'serviceKey' : self.auth_key,
                '_type' : 'json',
                }

        response = requests.get(url, params=params, verify=False)
        response_text = response.text
        response_json = json.loads(response_text)
        possible_city_df = pd.DataFrame(response_json['response']['body']['items']['item'])

        print('검색 가능한 도시 조회 완료\n')
        return possible_city_df
        
  • 서울 이외 지역의 버스데이터를 수집하기 위해서는 하나의 로직이 추가된다

※ (문제) 서울 이외 지역의 버스데이터는 한 번에 수집하는것이 불가능

[해결]

  1. 국토교통부_(TAGO)_버스정류소정보 서비스의 도시코드 목록 조회 서비스를 호출해, 버스정류소 정보 조회가 가능한 지역의 리스트를 모두 추출한 다음
  2. 각 도시마다 동일 서비스의 정류소번호 목록조회 서비스에서 도시코드를 넣고 각 도시별 버스정류소를 호출하는 두 단계를 통해 수집해야 한다.
  • 도시코드 목록 조회를 하는 코드가 search_possible_city() 메소드이다. 서비스는 동일하니, 서비스 키는 그대로 두고 엔드포인트 주소만 클래스 변수로 설정되어 있는 other_possible_list_url로 호출을 하여 가능한 지역 리스트를 DataFrame()으로 받는다.

  • 그 후 각 가능한 지역을 모두 순회하면서 호출하여 데이터를 수집한다. 수집 원리는 서울시 버스데이터 수집하는 원리와 거의 동일


profile
안녕하세요!

0개의 댓글