Слейте задание DataFlow и запустите еще одно сразу после, причиной дублирования сообщения - PullRequest
1 голос
/ 04 апреля 2019

У меня есть задание потока данных, которое подписалось на сообщения от PubSub:

p.apply("pubsub-topic-read", PubsubIO.readMessagesWithAttributes()


.fromSubscription(options.getPubSubSubscriptionName()).withIdAttribute("uuid"))

Я вижу в документах, что нет гарантии отсутствия дублирования, и Beam предлагает использовать withIdAttribute.

Это прекрасно работает, пока я не истощу существующую работу, подожду, пока она не будет завершена, и перезапущу другую, а затем я вижу миллионы дублирующих записей BigQuery (моя работа записывает сообщения PubSub в BigQuery).

Есть идеи, что я делаю не так?

1 Ответ

3 голосов
/ 05 апреля 2019

Я думаю, что вы должны использовать функцию обновления вместо использования стока для остановки конвейера и запуска нового конвейера.В последнем подходе состояние не разделяется между двумя конвейерами, поэтому Dataflow не может идентифицировать сообщения, уже доставленные из PubSub.Благодаря функции обновления вы сможете продолжить конвейер без дублирования сообщений.

...