Сканирование с помощьюластиком поиска и нарезки занимает много времени - PullRequest
0 голосов
/ 01 мая 2020

У меня есть запрос, который возвращает в общей сложности около 50-60 миллионов обращений к ElasticSearch. Я попытался разделить рабочую нагрузку на фрагменты, используя многопроцессорность, и, хотя это, безусловно, ускоряет общий процесс в начале, сканирование в конечном итоге замедляется, прежде чем останавливается после сбора около 15-20 миллионов обращений. Приведенный ниже код прекрасно работает для меньшего набора результатов, составляющего около 1 миллиона обращений, но я не могу получить его до конечного значения sh за разумное время для большего набора результатов.

    from elasticsearch import Elasticsearch as ES
    from elasticsearch_dsl import Search
    import multiprocessing as mp
    from multiprocessing import Pool
    from functools import partial

    SLICES = 5

    es = ES(['https://example.com/'],http_auth = 
                   (user,pwd),timeout = 120)



    def dump_slice(query, index, slice_no):    

        s = Search(using=es,index=index).update_from_dict(query)                                        
        s = s.extra(slice={"id": slice_no, "max": SLICES})     
        s = s.params(scroll='30m',preserve_order=True,size=5000)  
        hits = []                  
        for d in s.scan():                                                          

            my_id = {'id':d.meta.id}
            my_index = {'index':d.meta.index}

            hits.append({**my_id,**my_index,**d.to_dict()})


            if (len(hits) % 250000 == 0):
                logging.info(str(len(hits)) + ' records processed by PID ' + 
                              str(mp.current_process().pid))

        return hits

    q = partial(dump_slice, query, index)     

    with Pool(SLICES) as pool:
        hits = pool.map(q, range(SLICES))
...