InfluxDB python 이용해서 batch로 write 하기

0

InfluxDB

목록 보기
3/4

InfluxDB에 데이터를 넣어야 할 일이 있어서
python으로 데이터를 넣는 법을 공부해봤다.

ElasticSearch에서 data를 bulk로 넣을 수 있다고 했는데,
마찬가지로 influxDB에서도 data를 batch로 (여기서는 batch라고 함) 넣을 수 있다.

batch로 쓸 때 관련 자료가 별로 없어서 서칭을 진짜 열심히 한 부분이니
얼른 정리해 두기로 한다 ^_^


추가적으로, 구글링하면 보통 1.x 버전 기준으로 설명되어 있어서 해당 함수를 사용하지 못하는 경우가 많더라.. 그래서 진짜 헷갈리고 힘들었음
아무리 2.x 버전으로 업데이트 했다지만 이렇게 완전히 다르게 만들어서 사용자들을 힘들게 할 이유가 있었던 걸까... 싶음



import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "example" #######
org = "example"
token = "[insert token]"
url = "http://[insert url]:8086"

client = influxdb_client.InfluxDBClient(
        url = url,
        token = token,
        org = org
)

write_api = client.write_api(write_options = SYNCHRONOUS)

일단 대부분의 sdk가 그러하듯 이렇게 커넥션을 설정해줘야함

def make_point(tablename, timestamp, dic):
  point = {
      'tags': dic['tags'],
      'fields': dic['fields'],
      'time': timestamp
  }
  return point

    dic = {
      'tag': {
          'id' : example_id,
          'license' : example_license,
      },
      'field' : {
          'iowait' : round(random.uniform(0,1), 4),
          'irq' : round(random.uniform(0,1), 4),
          'steal' : round(random.uniform(0,1), 4),
          'idle' : round(random.uniform(0,1), 4),
          'softirq':round(random.uniform(0,1), 4)
      }
    }

make_point 함수처럼 tags와 fields를 따로 명시해주고, time의 경우에는 그냥 일반 현재 시간으로 넣고 싶다면 timestamp를 굳이 명시하지 않아도 괜찮을 것이다. (아마도)

왜냐면 centOS 에서 influx write으로 line protocol 만들어서 단일로 던져줄 때는 그렇게 잘 들어갔음. 아무래도 그건 influx가 인덱스가 timestamp이다 보니 꼭 필요한 값이라 디폴트 지정을 해주는 게 아닐까 하는 생각


추가적으로, 이건 그냥 내가 필요해서 만든 dictionary 형식으로 던져주는 방식이고, 실제로 던져질 때는

p = make_point(measurement_name, timestamp, dic)

이렇게 넣어주면 됨


한번에 보자면,

import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "example" #######
org = "example"
token = "[insert token]"
url = "http://[insert url]:8086"

client = influxdb_client.InfluxDBClient(
        url = url,
        token = token,
        org = org
)

write_api = client.write_api(write_options = SYNCHRONOUS)


def make_point(tablename, timestamp, dic):
  point = {
      'tags': dic['tag'],
      'fields': dic['field'],
      'time': timestamp
  }
  return point

dic = {
    'tag': {
        'id' : example_id,
        'license' : example_license,
    },
    'field' : {
        'iowait' : [value],
        'irq' : [value],
        'steal' : [value],
        'idle' : [value],
        'softirq': [value]
    }
 }

 
p = make_point(measurement_name, time, dic)

send_body = []
send_body.append(p)

if len(send_body) > 5000:
    write_api.write(bucket, org, send_body)
    send_body = []
    time.sleep(0.5)

어쩌다보니 indent가 안맞음 감안 부탁 ㅠㅠ


더 쉽게 말하자면 그냥 dictionary 형태 내에 tagsfieldstime를 만들고,
그 tags와 fields key 안에 value로 나의 tag-value, field-value 를 dictionary 형태로 다시 넣어서 하나의 line protocol을 만든 다음에
그걸 리스트로 담아서 던져버리면 됨 ~ !

말로 하면 어려워 보이는데 하여튼 내가 필요한 형태는 그렇게 된단 것임

point = {
    'tags': {
        'id' : example_id,
        'license' : example_license,
        },
    'fields': {
    	'iowait' : [value],
        'steal' : [value]
    },
    'time': timestamp
}

이해를 돕기 위해 만들어봤음 ㅠ 사실 내가 정리하다보니 기억이 잘 안나서 만듬

이런 형식으로 넣어서 이 point들을 list로 append하고 한번에 던지면 됨
가장 이상적인 batch size는 5000 lines라고 docs에 나와 있었는데 나는 3000개씩 던졌다가 시간 겁나 많이 걸림.

개인적으로 influxDB는 query를 쓸 때나 write를 할 때나 csv 형태로 하는 걸 제일 좋아하는 것 같음.
csv 데이터는 사이즈가 굉장히 컸음에도 엄청 빨리 들어갔음

그것도 정리해봐야겠다!

profile
분명히 처음엔 데린이었는데,, 이제 개린이인가..

0개의 댓글