Apache Луч не удаляет недействительный элемент из подписки - PullRequest
0 голосов
/ 08 января 2020

Только что понял, что мой конвейер неверен, когда дело доходит до ошибочных событий, они продолжают обрабатываться и никогда не удаляются из подписки.

По сути, у меня есть простой конвейер, который содержит триггер, который будет тянуть эти события в файле.

На одном из этапов он обрабатывает полезную нагрузку сообщения, полученного через PubSub, и направляет его на следующие этапы. Однако в некоторых случаях это не удается.

        pipeline
        .apply("Read PubSub Events",
            PubsubIO.readMessagesWithAttributes().fromSubscription(options.getSubscription()))
        .apply("Map to MyClass",
            ParDo.of(new PubSubMessageToMyClass())) // Exception thrown in this stage.
        .apply("Apply Timestamps", WithTimestamps.of(new SetTimestampFn()).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
        ...
        );

Когда возникает ошибка, я продолжаю видеть одно и то же событие снова и снова в конвейере, как будто оно никогда не завершает обработку.

Есть ли способ явно указать Apache Beam, чтобы сделать недействительным данное сообщение и предотвратить дальнейшую неудачную обработку?

1 Ответ

2 голосов
/ 08 января 2020

Поток данных обрабатывает элементы в произвольных пакетах и ​​повторяет весь пакет, когда выдается ошибка для любого элемента в этом пакете. При работе в пакетном режиме пакеты, включающие неисправный элемент, повторяются 4 раза. Конвейер завершится неудачей, если один пакет вышел из строя 4 раза. При работе в потоковом режиме пакет, включающий сбойный элемент, будет повторяться бесконечно , что может привести к тому, что ваш конвейер будет постоянно stall.

Рассмотрите возможность защиты от ошибок в вашем коде, добавив обработчики исключений. Например, если вы хотите удалить элементы, которые не проходят какую-либо пользовательскую проверку ввода, выполненную в ParDo, используйте блок try / catch в ParDo для обработки исключения и отбросьте элемент.

...