[python] influxDB 연동

22_gas·2024년 6월 21일

2023.10.5 작성

개요

이번엔 이미 구축되어있는 flask 서버에서 influxDB를 연동해야 할 일이 생겼다.

InfluxDB 란?

2013년에 Go 언어로 개발된 Time Series Database(시계열 데이터베이스)

몰랐는데 TSDB중에 제일 유명하다고 한다.

설치할때 Retension Policy를 설정할수 있는데 데이터의 보존기간을 설정할 수 있다.

기존 RDB 와 명칭에서 차이가 나니 알아두도록 하자

RDBInfluxDB
DatabaseBucket
TableMeasurement
ColumnKey
Indexed ColumnTag
Unindexed ColumnField

재밌는건 명칭은 bucket으로 지정되는데 접속해서 bucket만드는 쿼리는 "create database" 이다.(이러니 오히려 헷갈림)

해당 테스트는 window를 설치 -> wsl로 ubuntu 18.01을 설치 -> influx 설치 의 순서로 구축후 진행하였다.
(이번 post에서는 influxDB설치는 따로 다루지 않는다.)

제대로 설치가 됐다면 http://localhost:8086/  로 접속이 되며 web UI가 뜰것이다.

organization과 bucket까지 생성했다면 준비가 끝났다. 그 후 token을 발급받아 놓자(언제든 복사 가능하니 안 적어놔도 된다.)

연동

influxdb-client 라이브러리

라이브러리를 설치(influxDB에서 select만 할 예정)

pip install influxdb-client

설치가 끝나면 influxDB접속을 위해 아래의 정보를 이용하여 접속할 것이다.

["organization", "token", "url(host:port)"] 그리고 "bucket"정보

일단 접속하는 방법은 아래와 같다.

from influxdb_client import InfluxDBClient

influx_client = InfluxDBClient(url='http://127.0.0.1:8086', token="{너의 token}", org="{너의 organization 명}")

이제 쿼리를 날려볼 차례인데 

influxDB서 사용가능한 방법이 2가지 라고 한다. sql방식과 fluxQuery가 있다.

1. sql방식은 익히 보던 쿼리문이다.

select * from {measurement}

2. influxQuery: influx에서 만든 쿼리라는데 기존에도 이런 방식으로 되어있어 이 방식을 사용하기로 했다.

from(bucket: "db")
	|> range(start: -10m)
    |> filter(fn: (r) => r["_measurement"] == "test") 
    |> filter(fn: (r) => r["_field"] == "temperature")
    |> sort(columns: ["dev_no"])

쿼리 내용

  1. "YYYY-MM-DD"를 넘겨 받아서  YYYY-MM-DD 00:00:00 ~ YYYY-MM-DD 23:59:59 까지 조회

  2. "_field"는 온도 "temperature" 조회

  3. 장비번호 상위 그룹번호로 filter한다.

from influxdb_client import InfluxDBClient

influx_client = InfluxDBClient(url='http://127.0.0.1:8086', token="{너의 token}", org="{너의 organization 명}")

query_api = influx_client.query_api()

s_date = "2023-10-02"
field = "temperature"

_query = f'''from(bucket: "db")\
            |> range(start: {s_date}T00:00:00Z, stop: {s_date}T23:59:59Z)\
            |> filter(fn: (r) => r["_measurement"] == "test")\
            |> filter(fn: (r) => r["_field"] == "{field}")\
            |> filter(fn: (r) => r["dev_no"] == "{device_id}")
        '''
 result = query_api.query(org="{너의 organization}", query=_query)
 
 
 for row in result:
 	for record in row.records:
		print(record["dev_no"])
        	print(record.get_value())
 
 influx_client.close()

참고사항

  1. influx query를 사용하기 위해선 client에서 query_api()를 가져와야 한다고 한다.

    그냥 sql일 경우 influx_client.query(...)로 호출되는것 같다.

  1. 쿼리의 결과는 "record.get_value()"를 하면 되지만 별도의 작업이 필요하여 
    "컬럼값"과 "_time", "value" 를 전부 가져와야 했다.

컬럼값의 경우 record["컬럼명"] 으로가져올수 있고 _time의 경우 record["_time"] 으로 가져올수 있는데  front-end에서 그래프를 그리거나 할때 time값으로 바꿔주면 좀 편하다 그래서 그렇게 바꿔주었다.

_time의 경우 "2023-10-03 00:00:00.123+00:00" 의 형식으로 아래와 같은 코드로 변환작업이 필요하다.

from datetime import datetime

for row in result:
	for record in row.records:
    		time_obj = datetime.fromisoformat(str(record['_time']).split("+")[0])
       	 	print(int(time_obj.timestamp()))

참 쉬운작업인데 테스트를 하다보니 하루를 소비하였다.

profile
전 아직 모르는게 많아요

0개의 댓글