Flink - ElasticSearch Sink - обработка ошибок - PullRequest
0 голосов
/ 09 февраля 2019

Я пытаюсь следовать этому руководству по Flink [1] для обработки ошибок в ElasticSearchSink путем повторного добавления ошибочных сообщений в очередь.Сценарии ошибок, которые я получил и собираюсь повторить: (i) конфликт в версии документа UpdateRequest и (ii) потеря подключения к ElasticSearch.Эти ошибки, как ожидается, будут непостоянными, будут решаться путем (i) изменения версии / (ii) после нескольких секунд. Я ожидаю, что сообщение будет успешно повторено.На самом деле я получил: Flink, похоже, застрял на этой (первой) повторной попытке, мой поток был поставлен в очередь (противодавление равно 1 везде), вся обработка зависла.

Вот мой код обработки ошибок:

private object MyElasticSearchFailureHandler extends ActionRequestFailureHandler {
    override def onFailure(actionRequest: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer): Unit = {
        if (ExceptionUtils.findThrowableWithMessage(failure, "version_conflict_engine_exception") != Optional.empty()) {
            actionRequest match {
                case s: UpdateRequest =>
                    LOG.warn(s"Failed inserting record to ElasticSearch due to version conflict (${s.version()}). Retrying")
                    LOG.warn(actionRequest.toString)
                    indexer.add(s.version(s.version() + 1))
                case _ =>
                    LOG.error("Failed inserting record to ElasticSearch due to version conflict. However, this is not an Update-Request. Don't know why.")
                    LOG.error(actionRequest.toString)
                    throw failure
            }
        } else if (restStatusCode == -1 && failure.getMessage.contains("Connection closed")) {
            LOG.warn(s"Retrying record: ${actionRequest.toString}")
            actionRequest match {
                case s: UpdateRequest => indexer.add(s)
                case s: IndexRequest => indexer.add(s)
            }
        } else {
            LOG.error(s"ELASTICSEARCH FAILED:\n    statusCode $restStatusCode\n    message: ${failure.getMessage}\n${failure.getStackTrace}")
            LOG.error(s"    DATA:\n    ${actionRequest.toString}")
            throw failure
        }
    }
}

Вот выдержка из моих журналов диспетчера задач:

2019-02-09 04: 12: 35.676 [Диспетчер ввода-вывода 25] ОШИБКА oafsconnectors.elasticsearch.ElasticsearchSinkBase - ОшибкаМассовый запрос Elasticsearch: Соединение закрыто], doc_as_upsert [true], doc [index { [null] [null] [null] , source [{...}]}], scripted_upsert [false], detect_noop [true]} 2019-02-09 04: 12: 54.242 [Sink: S3 - Historical (1/4)] INFO oaflink.streaming.api.functions.sink.filesystem.Buckets - Подзадача 0 контрольной точки для контрольной точки с id = 24 (максимальный счетчик частей = 26).

И журналы работы менеджера:

2019-02-09 03: 59: 37.880 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Завершена контрольная точка 23 для задания 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 байт за 307078 мс).2019-02-09 04: 09: 30.970 [Таймер контрольной точки] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Запуск контрольной точки 24 @ 1549685370776 для задания 1a1438ca23387c4ef9a59ff9da6dafa1.2019-02-09 04: 17: 00.970 [Таймер контрольной точки] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - до завершения 24-й контрольной точки задания 1a1438ca23387c4ef9a59ff9da6dafa1 истек.2019-02-09 04: 24: 31.035 [Таймер контрольной точки] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - запуск контрольной точки 25 @ 1549686270776 для задания 1a1438ca23387c4ef9a59ff9da6dafa1.2019-02-09 04: 32: 01.035 [Таймер контрольной точки] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - до завершения 25-й контрольной точки задания 1a1438ca23387c4ef9a59ff9da6dafa1 истек.2019-02-09 04: 39: 30.961 [Таймер контрольной точки] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Активация контрольной точки 26 @ 1549687170776 для задания 1a1438ca23387c4ef9a59ff9da6dafa1.

Спасибо, наилучшие условияAverell

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests

...