API Elastic Search Scroll Асинхронное выполнение - PullRequest
0 голосов
/ 29 января 2019

Я использую эластичный поисковый кластер версии 5.6 с размером индекса 70 Гб / день.В конце дня нас просят подвести итоги каждого часа за последние 7 дней.Мы используем java-версию клиента High Level Rest и считаем, что количество документов, возвращаемых каждым запросом, крайне важно для прокрутки результатов.

Чтобы воспользоваться имеющимися у нас процессорами и сократить время чтенияМы думали об использовании асинхронной версии Scroll для поиска, но нам не хватает некоторого примера и, по крайней мере, логики для его продвижения вперед.

Мы уже проверили документацию, относящуюся к гибкому, но она расплывчата:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-high-search-scroll.html#java-rest-high-search-scroll-async

Мы также спрашиваем на гибком дискуссионном форуме, как они говорят, но, похоже, никто не может ответить на этот вопрос:

https://discuss.elastic.co/t/no-code-for-example-of-using-scrollasync-with-the-java-high-level-rest-client/165126

Любая помощь по этому вопросу будет очень признательна, и, конечно, я не единственный, кто имеет этот запрос.

Ответы [ 2 ]

0 голосов
/ 02 марта 2019

Вот пример кода:

    public class App {
    public static void main(String[] args) throws IOException, InterruptedException {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(HttpHost.create("http://localhost:9200")));

        client.indices().delete(new DeleteIndexRequest("test"), RequestOptions.DEFAULT);
        for (int i = 0; i < 100; i++) {
            client.index(new IndexRequest("test", "_doc").source("foo", "bar"), RequestOptions.DEFAULT);
        }
        client.indices().refresh(new RefreshRequest("test"), RequestOptions.DEFAULT);

        SearchRequest searchRequest = new SearchRequest("test").scroll(TimeValue.timeValueSeconds(30L));
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        String scrollId = searchResponse.getScrollId();

        System.out.println("response = " + searchResponse);

        SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)
                .scroll(TimeValue.timeValueSeconds(30));


        //I was missing to wait for the results
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        client.scrollAsync(scrollRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
            public void onResponse(SearchResponse searchResponse) {
                System.out.println("response async = " + searchResponse);
            }

            public void onFailure(Exception e) {

            }
        });

        //Here we wait
        countDownLatch.await();

        //Clear the scroll if we finish before the time to keep it alive. Otherwise it will be clear when the time is reached.    
        ClearScrollRequest request = new ClearScrollRequest()
        request.addScrollId(scrollId);

        client.clearScrollAsync(request, new ActionListener<ClearScrollResponse>(){
           @Override
           public void onResponse(ClearScrollResponse clearScrollResponse) {
           }

           @Override
           public void onFailure(Exception e) {
           }
         });

        client.close();           
       }
    }

Спасибо Дэвиду Пилато Упругое обсуждение

0 голосов
/ 29 января 2019

Сводка каждого часа за последние 7 дней

Звучит так, как будто вы хотели бы выполнить некоторые агрегации для данных, а не получать необработанные документы.вероятно, на первом уровне дата гистограммы , чтобы агрегировать с интервалами в 1 час.внутри этой гистограммы даты вам нужны внутренние агги для запуска ваших агрегаций - метрики / сегменты в зависимости от необходимых суммирований.

Начиная Elasticsearch v6.1, вы можете использовать Composite Aggregation , чтобыполучить все корзины результатов с помощью подкачки страниц.из документов, которые я связал:

составная агрегация может быть использована для разбиения на страницы всех сегментов из многоуровневой агрегации.Эта агрегация предоставляет способ для потоковой передачи всех сегментов определенной агрегации, аналогично тому, что делает прокрутка для документов.

К сожалению, эта опция не существует до v6.1, поэтому вам также потребуется обновитьES, чтобы использовать его или найти другой способ, например, разбить на несколько запросов, которые вместе покроют 7 дней требования.

...