Только что понял, что мой конвейер неверен, когда дело доходит до ошибочных событий, они продолжают обрабатываться и никогда не удаляются из подписки.
По сути, у меня есть простой конвейер, который содержит триггер, который будет тянуть эти события в файле.
На одном из этапов он обрабатывает полезную нагрузку сообщения, полученного через PubSub, и направляет его на следующие этапы. Однако в некоторых случаях это не удается.
pipeline
.apply("Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getSubscription()))
.apply("Map to MyClass",
ParDo.of(new PubSubMessageToMyClass())) // Exception thrown in this stage.
.apply("Apply Timestamps", WithTimestamps.of(new SetTimestampFn()).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
...
);
Когда возникает ошибка, я продолжаю видеть одно и то же событие снова и снова в конвейере, как будто оно никогда не завершает обработку.
Есть ли способ явно указать Apache Beam, чтобы сделать недействительным данное сообщение и предотвратить дальнейшую неудачную обработку?