Elasticsearch Bulk API - PullRequest
       6

Elasticsearch Bulk API

0 голосов
/ 28 апреля 2020

Я хотел бы спросить об Elasticsearch Bulk API

Это мой код для использования Bulk API

public void bulkInsert(String index, ArrayList<String> jsonList) throws IOException {
    BulkRequest request = new BulkRequest(); 

    for(String json: jsonList){
        if(json != null&& !json.isEmpty()){
            request.add(new IndexRequest(index)  
                    .source(json, XContentType.JSON));  
        }
    }

    BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
    for (BulkItemResponse bulkItemResponse : bulkResponse) { 
        DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

        switch (bulkItemResponse.getOpType()) {
        case INDEX:    
        case CREATE:
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            break;
        case UPDATE:   
            UpdateResponse updateResponse = (UpdateResponse) itemResponse;
            break;
        case DELETE:   
            DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
        }
    }
    if (bulkResponse.hasFailures()) { 
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
            if (bulkItemResponse.isFailed()) { 
                BulkItemResponse.Failure failure =
                        bulkItemResponse.getFailure(); 

                System.out.println("failed: " + failure.getId());

            }
        }
    }
}

Я обнаружил исключение тайм-аута, поскольку мои записи получили 800k. java. net .SocketTimeoutException: время ожидания соединения 30 000 миллисекунд при подключении http-outgoing-16 [ACTIVE]

Я попытался разорвать переданный мне jsonList, но иногда у меня будет та же ошибка.

В настоящее время я использую версию Elasticsearch 7.6.2.

Трассировка исключения

java. net .SocketTimeoutException: 30 000 миллисекунд тайм-аут при подключении http-outgoing-16 [ACTIVE] в org.elasticsearch.client.RestClient. extractAndWrapCause (RestClient. java: 808) в org.elasticsearch.client.RestClient.performRequest (RestClient. java: 248) в org.elasticsearch.client.RestClient.performRequest (RestClient. java: 23) .elasticsearch.client. 1026 *: 1454) в org.elasticsearch.client.RestHighLevelClient.bulk (RestHighLevelClient. java: 497) в com.ESUtil.bulkInsert (ESUtil. java: 110) в org.download.App1.main (приложение 1). java: 167) в sun.reflect.NativeMethodAccessorImpl.invoke0 (собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke (неизвестный источник . 58) Причина: java. net .SocketTimeoutException: 30 000 миллисекунд тайм-аут при подключении http-outgoing-16 [ACTIVE] в org. apache .http.nio.protocol.HttpAsyncRequestExecutor.timeout (HttpAsyncRequestExecutor. * 1035 : 387) в орг. apache .http.impl.nio.client.InternalIODispatch.onTimeout (InternalIODispatch. java: 92) в орг. apache .http.impl.nio.client.InternalIODispatch.onTimeout (InternalIODispatch . java: 39) в орг. apache .http.impl.nio.reactor.AbstractIODispatch.timeout (AbstractIODispatch. java: 175) в орг. apache .http.impl.nio.reactor.BaseIOReactor .sessionTimedOut (BaseIOReactor. java: 261) в орг. apache .http.impl.nio.reactor.AbstractIOReactor.timeoutCheck (AbstractIOReactor. java: 502) в орг. apache .http.impl.nio .reactor.BaseIOReactor.validate (BaseIOReactor. java: 211) в орг. apache .ht tp.impl.nio.reactor.AbstractIOReactor.execute (AbstractIOReactor. java: 280) в org. apache .http.impl.nio.reactor.BaseIOReactor.execute (BaseIOReactor. java: 104) в орг. apache .http.impl.nio.reactor.AbstractMultiworkerIOReactor $ Worker.run (AbstractMultiworkerIOReactor. java: 591) в java .lang.Thread.run (неизвестный источник)

1 Ответ

0 голосов
/ 28 апреля 2020

Поскольку вы используете массовый API и отправляете огромное количество данных в Elasticsearch, а время ожидания по умолчанию для соединения составляет 30 seconds, а Elasticsearch не может завершить sh эту огромную массовую операцию в 30 seconds, следовательно, вы получаете это исключение.

Это время ожидания является нормальным для больших массовых API, и в вашем случае (указание индексации c) вы можете сделать следующее:

Обновите инфраструктуру и ускорите скорость индексации

Масштабирование вашего кластера ie, добавьте больше ЦП, памяти, улучшенного диска, отключите refresh_interval (по умолчанию 1 se c), чтобы ускорить массовое индексирование.

Увеличение времени ожидания Bulk API

Как упоминалось в Официальная ES делает c

request.timeout(TimeValue.timeValueMinutes(2));   --> 2 min timeout
request.timeout("2m");  --> string format of 2 sec.

Редактировать : как указано в комментарий, вы можете использовать syn c выполнение массового API , если вы хотите немедленно проверить ответ вашего массового API , ниже приведена цитата из того же do c:

Получить ответ операции (успешно или нет), может быть IndexResponse, UpdateResponse или DeleteResponse, которые все могут рассматриваться как экземпляры DocWriteResponse

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...