from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pandas as pd
def get_data_from_es_by_query(es_client, es_index, es_query, es_doc=None, size=1000, call_back=None):
res = es_client.search(index=es_index, body=es_query, scroll='10m', size=size)
sid = res['_scroll_id']
if type(res['hits']['total']) in [dict]:
ssize = res['hits']['total']['value']
else:
ssize = res['hits']['total']
total = ssize
cnt = 0
ret, aggr_ret = [], None
if 'aggregations' in res:
aggr_ret = res['aggregations']
if True:
for hit in res['hits']['hits']:
cnt += 1
if call_back is None:
ret.append(hit)
print(hit)
else:
call_back(hit)
while ssize > 0:
res = es_client.scroll(scroll_id=sid, scroll='10m')
sid = res['_scroll_id']
ssize = len(res['hits']['hits'])
for hit in res['hits']['hits']:
cnt += 1
if call_back is None:
print(cnt)
ret.append(hit)
else:
call_back(hit)
es_client.clear_scroll(sid)
return ret, aggr_ret
index='datalake_market_category_matching'
data = {"match_all":{}}
body = {'from':0, 'size':100,"query":data}
es = Elasticsearch("http://your_url", timeout=30, max_retries=10, retry_on_timeout=True)
df = get_data_from_es_by_query(es, index, body)
print(df)