Beam / DataFlow :: ReadFromPubSub (id_label) :: Неожиданное поведение - PullRequest
0 голосов
/ 16 марта 2019

Может кто-нибудь уточнить, для чего нужен аргумент id_label в преобразовании ReafFromPubSub ?

Я использую приемник BigQuery, насколько я понимаю, он действует как insertId для BQ Streaming API, Tabledata: insertAll

Уникальный идентификатор для каждой строки.BigQuery использует это свойство для выявления дублирующих запросов на вставку.Для получения дополнительной информации см. Согласованность данных.

Однако я не вижу этого ожидаемого поведения.

  • Я публикую сообщения в Pub / Sub, каждыйсообщение с тем же атрибутом message_id значение (это намеренно для проверки поведения конвейера / дедупликации BQ)

  • Мой конвейер читает из пабов следующим образом beam.io.gcp.pubsub.ReadFromPubSub(topic=TOPIC, subscription=None, id_label='message_id')

но все еще запрашивая BQ, все сообщения вставляются.Я ожидал, потому что каждое сообщение, опубликованное с тем же значением message_id, BQ должно было выводить эти ...

Может кто-то уточнить PLS?Заранее спасибо!

Кроме того, я замечаю, что DirectRunner продолжает выдавать ошибку при использовании этого атрибута,

NotImplementedError: DirectRunner: id_label не поддерживается для чтения PubSub

Я должен использовать DataflowRunner ... это также ожидается?

Приветствия!

ОБНОВЛЕНИЕ 1 : перемещено в DataflowRunner, иконвейер, кажется, уважает аргумент id_label во время ReadFromPubSub (). Тем не менее, дубликаты сообщений продолжают время от времени считываться в конвейер .

уведомление, я также передаю то же message_id значение (= '2') в атрибуте сообщения (это намерение попробовать, проверить выводповедение).

  • мой конвейер (работает на Dataflow Runner, луч Python v2.11 SDK, код конвейера здесь ), выводит следующее сообщение в BQ.Как видите, несколько сообщений с одинаковым message_id считываются в конвейер и отправляются в приемник.Обычно это происходит, когда я останавливаю / перезапускаю свое приложение издателя.
cid=141&message_id=2&evt_time=2019-03-17T09:31:15.792653Z
cid=141&message_id=2&evt_time=2019-03-17T09:30:00.767878Z
cid=141&message_id=2&evt_time=2019-03-17T09:28:30.747951Z
cid=141&message_id=2&evt_time=2019-03-17T09:22:30.668764Z
cid=141&message_id=2&evt_time=2019-03-17T09:21:00.646867Z
cid=141&message_id=2&evt_time=2019-03-17T09:19:45.630280Z
cid=141&message_id=2&evt_time=2019-03-17T09:12:05.466953Z
cid=141&message_id=2&evt_time=2019-03-17T09:10:42.956195Z
cid=141&message_id=2&evt_time=2019-03-17T09:01:42.816151Z

1 Ответ

1 голос
/ 16 марта 2019

Это разные идентификаторы. Как объяснено здесь , каждое сообщение, опубликованное в теме, имеет поле с именем messageId, которое гарантированно будет уникальным в данной теме. Pub / Sub гарантирует не менее чем однократную доставку , поэтому подписка может иметь дубликаты (то есть сообщения с одинаковыми messageId). Поток данных имеет семантику точной обработки, поскольку он использует это поле для дедупликации сообщений при чтении из подписки. Это не зависит от приемника, который не обязательно должен быть BigQuery.

Используя id_label (или .withIdAttribute() в Java SDK), мы можем заставить сообщения считаться дублирующимися в соответствии с другим полем, которое должно быть уникальным (например, идентификатор заказа, идентификатор клиента и т. Д.). Источник ввода будет читать повторные сообщения только один раз, вы не увидите, как они увеличивают количество элементов ввода в конвейере. Имейте в виду, что Direct Runner предназначен только для тестирования и не дает таких же гарантий с точки зрения контрольных точек, дедупликации и т. Д. В качестве примера см. этот комментарий, Это наиболее вероятная причина того, почему вы видите их в конвейере, также принимая во внимание сообщения NotImplementedError, поэтому я бы предложил перейти к Dataflow Runner.

С другой стороны, insertId используется на основе по максимуму , чтобы избежать дублирования строк при повторной попытке потоковых вставок в BigQuery. При использовании BigQueryIO он создается под капотом и не может быть указан вручную. В вашем случае, если ваши N-сообщения поступают в конвейер, а N записываются в BigQuery, это работает как положено. Если нужно было повторить попытку, строка имела тот же insertId и поэтому была отброшена.

...