version_conflict_engine_exception with _update_by_query - PullRequest
0 голосов
/ 17 января 2020

Я использую обновление ElasticSearch по API запросов во flink, параллелизм flink равен 1. Но я получил version_conflict_engine_exception, это мой код в flink RichSinkFunction так:

        UpdateByQueryRequestBuilder builder = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
        builder.abortOnVersionConflict(true);
        builder.source(indexName);
        builder.filter(filter);
        builder.setMaxRetries(MAX_RETRIES);
        builder.refresh(true);

        String updateTime = Instant.ofEpochMilli(ts).atZone(ZoneId.systemDefault())
                .format(ELASTIC_SEARCH_DATE_TIME_FORMATTER);

        Map<String, Object> params = Maps.newHashMap();
        params.put("fieldName", fieldName);
        params.put("updateTime", updateTime);
        params.put("model", this.transformMap(JacksonUtils.convertValue(model, new TypeReference<Map<String, Object>>() {
        })));

        builder.script(new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, UPDATE_BY_MODEL_PAINLESS_CODE, params));
        BulkByScrollResponse response = builder.get();

Я могу быть уверен, что только это приложение Доступ к Elasticsearch, параллелизм flink равен 1, как при однопоточном вызове обновления через API запросов? Почему я получил исключение version_conflict_engine_exception? и как сделать ровно один раз?

1 Ответ

0 голосов
/ 17 января 2020

Я вижу две возможности:

  1. Запущено что-то еще, что может обновить документ.
  2. Приемник эластичного поиска Flink обеспечивает как минимум однократные гарантии, что означает, что в случае В случае сбоя приемник иногда выполняет повторные записи во время восстановления. Возможно, это может привести к попыткам обновить документ с использованием устаревшего номера версии.
...