InfluxDB에 데이터를 넣어야 할 일이 있어서
python으로 데이터를 넣는 법을 공부해봤다.
ElasticSearch에서 data를 bulk로 넣을 수 있다고 했는데,
마찬가지로 influxDB에서도 data를 batch로 (여기서는 batch라고 함) 넣을 수 있다.
batch로 쓸 때 관련 자료가 별로 없어서 서칭을 진짜 열심히 한 부분이니
얼른 정리해 두기로 한다 ^_^
추가적으로, 구글링하면 보통 1.x 버전 기준으로 설명되어 있어서 해당 함수를 사용하지 못하는 경우가 많더라.. 그래서 진짜 헷갈리고 힘들었음
아무리 2.x 버전으로 업데이트 했다지만 이렇게 완전히 다르게 만들어서 사용자들을 힘들게 할 이유가 있었던 걸까... 싶음
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "example" #######
org = "example"
token = "[insert token]"
url = "http://[insert url]:8086"
client = influxdb_client.InfluxDBClient(
url = url,
token = token,
org = org
)
write_api = client.write_api(write_options = SYNCHRONOUS)
일단 대부분의 sdk가 그러하듯 이렇게 커넥션을 설정해줘야함
def make_point(tablename, timestamp, dic):
point = {
'tags': dic['tags'],
'fields': dic['fields'],
'time': timestamp
}
return point
dic = {
'tag': {
'id' : example_id,
'license' : example_license,
},
'field' : {
'iowait' : round(random.uniform(0,1), 4),
'irq' : round(random.uniform(0,1), 4),
'steal' : round(random.uniform(0,1), 4),
'idle' : round(random.uniform(0,1), 4),
'softirq':round(random.uniform(0,1), 4)
}
}
make_point
함수처럼 tags와 fields를 따로 명시해주고, time
의 경우에는 그냥 일반 현재 시간으로 넣고 싶다면 timestamp를 굳이 명시하지 않아도 괜찮을 것이다. (아마도)
왜냐면 centOS 에서 influx write
으로 line protocol
만들어서 단일로 던져줄 때는 그렇게 잘 들어갔음. 아무래도 그건 influx가 인덱스가 timestamp이다 보니 꼭 필요한 값이라 디폴트 지정을 해주는 게 아닐까 하는 생각
추가적으로, 이건 그냥 내가 필요해서 만든 dictionary 형식으로 던져주는 방식이고, 실제로 던져질 때는
p = make_point(measurement_name, timestamp, dic)
이렇게 넣어주면 됨
한번에 보자면,
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "example" #######
org = "example"
token = "[insert token]"
url = "http://[insert url]:8086"
client = influxdb_client.InfluxDBClient(
url = url,
token = token,
org = org
)
write_api = client.write_api(write_options = SYNCHRONOUS)
def make_point(tablename, timestamp, dic):
point = {
'tags': dic['tag'],
'fields': dic['field'],
'time': timestamp
}
return point
dic = {
'tag': {
'id' : example_id,
'license' : example_license,
},
'field' : {
'iowait' : [value],
'irq' : [value],
'steal' : [value],
'idle' : [value],
'softirq': [value]
}
}
p = make_point(measurement_name, time, dic)
send_body = []
send_body.append(p)
if len(send_body) > 5000:
write_api.write(bucket, org, send_body)
send_body = []
time.sleep(0.5)
어쩌다보니 indent가 안맞음 감안 부탁 ㅠㅠ
더 쉽게 말하자면 그냥 dictionary 형태 내에 tags
와 fields
와 time
를 만들고,
그 tags와 fields key 안에 value로 나의 tag-value, field-value 를 dictionary 형태로 다시 넣어서 하나의 line protocol을 만든 다음에
그걸 리스트로 담아서 던져버리면 됨 ~ !
말로 하면 어려워 보이는데 하여튼 내가 필요한 형태는 그렇게 된단 것임
point = {
'tags': {
'id' : example_id,
'license' : example_license,
},
'fields': {
'iowait' : [value],
'steal' : [value]
},
'time': timestamp
}
이해를 돕기 위해 만들어봤음 ㅠ 사실 내가 정리하다보니 기억이 잘 안나서 만듬
이런 형식으로 넣어서 이 point들을 list로 append하고 한번에 던지면 됨
가장 이상적인 batch size는 5000 lines라고 docs에 나와 있었는데 나는 3000개씩 던졌다가 시간 겁나 많이 걸림.
개인적으로 influxDB는 query를 쓸 때나 write를 할 때나 csv 형태로 하는 걸 제일 좋아하는 것 같음.
csv 데이터는 사이즈가 굉장히 컸음에도 엄청 빨리 들어갔음
그것도 정리해봐야겠다!