PubsubIO не выводит пользовательский атрибут метки времени в качестве context.timestamp при работе со службами DataflowRunner и Dataflow - PullRequest
0 голосов
/ 08 октября 2018

Я работаю над проектом Apache Beam, который столкнулся с проблемой со службой потока данных и PubsubIO, связанной с атрибутом пользовательской метки времени.Текущая версия Beam SDK: 2.7.0 .

. В проекте у нас есть 2 задания потока данных, взаимодействующих через тему и подписку PubSub:

Первоеконвейер (передача данных в PubSub)

Этот конвейер работает с сообщениями на основе, поэтому к нему не применена пользовательская стратегия окна, кроме GlobalWindows (по умолчанию от Beam).В конце этого конвейера мы потопили (написали) все сообщения, которым уже была назначена карта атрибутов, включая их метку времени события (например, "ключ-публикация") к теме PubSubиспользуя PubsubIO.writeMessages().

Примечание: если мы используем PubsubIO.writeMessages().withTimestampAttribute(), этот метод скажет PubsubIO.ShardFn, PubsubIO.WriteFn и PubsubClient на запись / перезапись время обработки тонущего конвейера до этот атрибут на карте.

Второй конвейер (чтение данных из PubSub)

Во втором конвейере (чтение конвейера) мы попробовали PubsubIO.readMessagesWithAttributes().withTimestampAttribute("published_at") и PubsubIO.readStrings().withTimestampAttribute("published_at") для источника.

  • При работе с DirectRunner все работало хорошо, как и ожидалось.Сообщения были прочитаны из подписки PubSub и выведены на последующие этапы с ProcessContext.timestamp(), равным их отметке времени события "published_at".
  • Но при работе с DataflowRunner , ProcessContext.timestamp()всегда был установлен около реального времени , которое закрыто для времени обработки тонущего конвейера.Мы проверили и можем подтвердить, что эти временные метки были не от времени публикации PubSub .Все данные были тогда назначены неправильным окнам по сравнению с их отметкой времени домена событий.Мы ожидали, что поздние данные будут отброшены, чтобы их нельзя было назначить в недопустимые окна.

Примечание. Мы оставили тему Pubsub, заполненную значительным объемом данных, прежде чем включить второй конвейер дляиметь некоторые исторические / поздние данные.

Сообщения Pubsub с недопустимой меткой времени контекста

Предполагаемая основная причина

Глубже изучая исходный код DataflowRunner, мы видим, что Служба потоков данных использует совершенно другой код Pubsub (переопределяя PubsubIO.Read во время построения конвейера) для чтения и отправки в Pubsub .

Поэтому, если мы хотим использовать PubsubIO для Beam SDK, мы должны использовать экспериментальную опцию "enable_custom_pubsub_source".Но пока не повезло, поскольку мы столкнулись с этой проблемой https://jira.apache.org/jira/browse/BEAM-5674 и не смогли протестировать коды Pubsub Beam SDK.

Обходное решение

Наш текущий обходной путь заключается в том, что после шага, присваивающего окнам сообщениям, мы реализовали a DoFn для проверки их метки времени по сравнению с IntervalWindow. Если окна недействительны , тогда мы просто отбрасываем сообщения , а затем запускаем еженедельные или полнедельные задания, чтобы исправить их из исторического источника.Лучше иметь некоторые пропущенные данные, чем неправильно рассчитанные.

Сообщения, отброшенные из-за неправильных окон

Пожалуйста, поделитесь с нами опытом по этому делу.Мы знаем, что с точки зрения управления водяными знаками в потоке данных водяной знак, как говорят, приспосабливается к текущему реальному времени, если загружаемые данные разбросаны (не достаточно плотно с течением времени).

Мы также считаем, что мы неправильно понимаемкое-что о том, как служба Dataflow поддерживает временную метку вывода PubsubUnboundedSource, поскольку мы все еще новичок в Apache Beam и Google Dataflow, поэтому есть вещи, о которых мы еще не узнали.

Большое спасибо!

1 Ответ

0 голосов
/ 20 октября 2018

Я нашел решение этой проблемы.В моем тонущем конвейере атрибут timestamp установлен с неверным форматом даты по сравнению со стандартом RFC 3339.Отформатированные даты пропустили символ «Z».Мы либо исправили символ «Z», либо изменили использование миллисекунд с начала эпохи.Оба работали хорошо.

Но одна вещь состоит в том, что, когда служба потока данных не могла проанализировать неправильные форматы даты, она выдала предупреждение или выдавала ошибку, а вместо этого занимала время обработки для всех элементов, поэтому они были назначены неправильнымокна event_time.

...