Python에서 Postgresql -> influxDB 로 이관하는 방법

kmsdoit·2022년 7월 1일
post-thumbnail

저는 우선 파이썬을 깊게 공부하지 않았습니다. 그렇지만 회사에서는 주로 Go와 Python을 주로 쓰기 때문에 저는 python을 통해서 이관을 진행했습니다. 제 나름대로 짰지만 더 좋은 코드가 있다면 조언 감사하게 받겠습니다.

InfluxDB란?

Influx DB란 많은 쓰기 작업과 쿼리 부하를 처리하기 위해 2013년에 Go 언어로 개발된 오픈소스 Time Series Database(시계열 데이터베이스)로써 Tick Stack(Telegraf + InfluxDB + Chronograf + Kapacitor)의 필수 컴포넌트 중 하나이다. Influx DB는 많은 TSDB들(Prometheus, TimescaleDB, Graphite, 등) 중에서 가장 유명하고, 많이 사용되는 데이터베이스이다. Influx DB는 Distributed, Scale horizontally하게 설계되어 새로운 노드만 추가하면 손쉽게 scale-out할 수 있으며, Restful API를 제공하고 있어 API 통신이 가능하다.

Python 코드

from copy import deepcopy
import time
from influxdb import InfluxDBClient # influx
from datetime import datetime,timedelta
import pandas as pd
import numpy as np
from sqlalchemy import create_engine # postgres

database = create_engine('postgresql://admin:password#@localhost:5432/test') #postgres 접속 정보
conn = database.connect()
client = InfluxDBClient('localhost', 8086) # InfluxDB접속
client.switch_database('test')
print('connection Success')
point = {} # influxdb 기본규격을 위한 dict
json_body = [] # influxdb 기본 규격을 위한 array
work_line = ''
            
def WorkLinemigration(work_line):
    # temp_Table 업데이트 쿼리
    print(work_line)
    q2 = f"update migrationTable_temp_table set time = (select regist_date from migrationTable order by regist_date desc limit 1);"
    
    # 데이터 이관 쿼리
    q1 = f"select * from migrationTable where regist_date > (select time from migrationTable_temp_table) limit 50000"
    sensor_df = pd.read_sql(sql=q1,con=conn)
    if len(sensor_df) > 0:
    	# influxdb 들어가는 기본 규격
        point = {
            "measurement" : f'{work_line}',
            "tags": {
                "tag" : '',
            },
            "fields" :{
                "field" : None,
            },
            "time" : None
        }
        for x in sensor_df.values:
            np = deepcopy(point)
            np['tags']['tag'] = 'tag'
            np['fields']['field'] = 'field'
            np['time'] = datetime.now()
            json_body.append(np)
        client.write_points(json_body) # insert문
        conn.execute(q2) # update쿼리문 실행

이관 코드를 전체를 다쓸 수 없어 내용을 조금 바꿨습니다 데이터가 이백만건이 넘다 보니 중간에 빠질 수도 있기 때문에 migrationTable_Temp_table이라는 곳에 시간을 저장하는 테이블을 만들었습니다

궁금한 이야기가 있으시다면 언제든지 연락주세요 (klmmms882912@naver.com)

profile
하나님 열심히 믿는 Junior back-end Developer🙏

0개의 댓글