Прежде всего, я хочу, чтобы вы, ребята, знали, что я знаю основную логику работы того, как работает ElasticSearch Scroll API.Чтобы использовать API Scroll , сначала нам нужно вызвать метод search с некоторым значением прокрутки, например 1m , затем он вернет _scroll_id это будет использоваться для следующих последовательных вызовов Scroll, пока все документы не вернутся в цикл.Но проблема в том, что я просто хочу использовать один и тот же процесс на многопоточной основе, а не на последовательной.Например:
Если у меня 300000 документов, то я хочу обработать / получить документы следующим образом
- 1-й поток обработает исходные 100000 документов
- 2-й поток обработает следующий 100000 документов
- 3-й поток обработает оставшихся 100000 документов
Итак, мойвопрос в том, что я не нашел способа установить значение из в API прокрутки, как я могу ускорить процесс прокрутки с помощью многопоточности.Не обрабатывать документы в последовательном порядке.
Мой пример кода Python
if index_name is not None and doc_type is not None and body is not None:
es = init_es()
page = es.search(index_name,doc_type, scroll = '30s',size = 10, body = body)
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while (scroll_size > 0):
print("Scrolling...")
page = es.scroll(scroll_id=sid, scroll='30s')
# Update the scroll ID
sid = page['_scroll_id']
print("scroll id: " + sid)
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print("scroll size: " + str(scroll_size))
print("scrolled data :" )
print(page['aggregations'])