create table building_floor ( pnu varchar(19), building_pk varchar(33), address varchar, road_address varchar, building_name varchar, dong_name varchar, floor_type_code varchar, floor_type_name varchar, floor_no numeric, floor_no_name varchar, structure_code varchar(2), structure_name varchar, structure_detail varchar, use_code varchar(5), use_name varchar, use_detail varchar, area numeric, main_sub_type varchar, is_excepted_area boolean, created_date varchar(8), uploaded_at timestamp default current_timestamp ); create index index_building_floor_on_pnu on building_floor (pnu); create index index_building_floor_on_use_code on building_floor (use_code);
import psycopg2 conn = psycopg2.connect( 'host=localhost port=5432 dbname=postgres user=postgres password=postgres', options='-c search_path=sbd' ) conn.set_session(autocommit=True) cur = conn.cursor()
cur.execute( f''' select count(*) from information_schema.tables where table_schema = 'sbd' and table_name ~ 'building_floor' ''' ) if not cur.fetchone()[0]: cur.execute( open('sql/source-create_table_building_floor.sql', 'r').read() )
후속 작업에 필요한 컬럼 목록, 테이블 초기화
cur.execute( f''' select column_name from information_schema.columns where table_schema = 'sbd' and table_name = 'building_floor' and column_default is null ''' ) column_list = [ col[0] for col in cur.fetchall() ]
cur.execute( 'delete from building_floor' )
def uploadToDB(data_table): print(datetime.now(),'start...',end='') data_table.columns = [ '관리_건축물대장_PK', '대지_위치', '도로명_대지_위치', '건물_명', '시군구_코드', '법정동_코드', '대지_구분_코드', '번', '지', '특수지_명', '블록', '로트', '새주소_도로_코드', '새주소_법정동_코드', '새주소_지상지하_코드', '새주소_본_번', '새주소_부_번', '동_명', '층_구분_코드', '층_구분_코드_명', '층_번호', '층_번호_명', '구조_코드', '구조_코드_명', '기타_구조', '주_용도_코드', '주_용도_코드_명', '기타_용도', '면적(㎡)', '주_부속_구분_코드', '주_부속_구분_코드_명', '면적_제외_여부', '생성_일자' ] data_table['시도_코드'] = [code[0:2] for code in data_table['시군구_코드']] data_table = data_table[data_table['시도_코드'] == '11'] # '전국' 중 '서울'만 추출 data_table['pnu'] = data_table['시군구_코드'] + data_table['법정동_코드'] + [ # 토지 ID인 PNU 생성 '1' if code == '0' else '2' if code == '1' else '0' for code in data_table['대지_구분_코드'] ] + data_table['번'] + data_table['지'] data_table = data_table[[ 'pnu', '관리_건축물대장_PK', '대지_위치', '도로명_대지_위치', '건물_명', '동_명', '층_구분_코드', '층_구분_코드_명', '층_번호', '층_번호_명', '구조_코드', '구조_코드_명', '기타_구조', '주_용도_코드', '주_용도_코드_명', '기타_용도', '면적(㎡)', '주_부속_구분_코드_명', '면적_제외_여부', '생성_일자' ]] data_table.columns = column_list data_table.floor_no_name = data_table.floor_no_name.replace('[\\\]', '', regex=True) # 전처리 : 특수문자 제거 data_table.structure_detail = data_table.structure_detail.replace('[\\\]', '', regex=True) # 전처리 : 특수문자 제거 data_table.use_detail = data_table.use_detail.replace('[\\\]', '', regex=True) # 전처리 : 특수문자 제거 data_table.is_excepted_area = [ # 0,1 -> False, True False if (pd.isna(tf) or tf == '0') else True for tf in data_table['is_excepted_area'] ] data_table.to_csv( # .txt 파일로 로컬 저장 'temp_building_floor.txt', sep='|', index=False, header=False, encoding='CP949' ) temp_file = open('temp_building_floor.txt', 'r') print('upload...',end='') cur.copy_from( # file bulk insert temp_file, 'building_floor', sep='|', columns=column_list, null='' ) temp_file.close() os.remove('temp_building_floor.txt') print('end',datetime.now())
from zipfile import ZipFile import pandas as pd import os from datetime import datetime zf = ZipFile('D:/data/building_register/building_floor/국토교통부_건축물대장_층별개요+(2022년+07월).zip') source_chunks = pd.read_csv( zf.open('mart_djy_04.txt'), sep='|', encoding='CP949', header=None, dtype='string', chunksize=500_000 )
for source_dt in source_chunks: uploadToDB(source_dt)