У меня есть запрос, который возвращает в общей сложности около 50-60 миллионов обращений к ElasticSearch. Я попытался разделить рабочую нагрузку на фрагменты, используя многопроцессорность, и, хотя это, безусловно, ускоряет общий процесс в начале, сканирование в конечном итоге замедляется, прежде чем останавливается после сбора около 15-20 миллионов обращений. Приведенный ниже код прекрасно работает для меньшего набора результатов, составляющего около 1 миллиона обращений, но я не могу получить его до конечного значения sh за разумное время для большего набора результатов.
from elasticsearch import Elasticsearch as ES
from elasticsearch_dsl import Search
import multiprocessing as mp
from multiprocessing import Pool
from functools import partial
SLICES = 5
es = ES(['https://example.com/'],http_auth =
(user,pwd),timeout = 120)
def dump_slice(query, index, slice_no):
s = Search(using=es,index=index).update_from_dict(query)
s = s.extra(slice={"id": slice_no, "max": SLICES})
s = s.params(scroll='30m',preserve_order=True,size=5000)
hits = []
for d in s.scan():
my_id = {'id':d.meta.id}
my_index = {'index':d.meta.index}
hits.append({**my_id,**my_index,**d.to_dict()})
if (len(hits) % 250000 == 0):
logging.info(str(len(hits)) + ' records processed by PID ' +
str(mp.current_process().pid))
return hits
q = partial(dump_slice, query, index)
with Pool(SLICES) as pool:
hits = pool.map(q, range(SLICES))