Я работаю над проектом Apache Beam, который столкнулся с проблемой со службой потока данных и PubsubIO, связанной с атрибутом пользовательской метки времени.Текущая версия Beam SDK: 2.7.0 .
. В проекте у нас есть 2 задания потока данных, взаимодействующих через тему и подписку PubSub:
Первоеконвейер (передача данных в PubSub)
Этот конвейер работает с сообщениями на основе, поэтому к нему не применена пользовательская стратегия окна, кроме GlobalWindows
(по умолчанию от Beam).В конце этого конвейера мы потопили (написали) все сообщения, которым уже была назначена карта атрибутов, включая их метку времени события (например, "ключ-публикация") к теме PubSubиспользуя PubsubIO.writeMessages()
.
Примечание: если мы используем PubsubIO.writeMessages().withTimestampAttribute()
, этот метод скажет PubsubIO.ShardFn
, PubsubIO.WriteFn
и PubsubClient
на запись / перезапись время обработки тонущего конвейера до этот атрибут на карте.
Второй конвейер (чтение данных из PubSub)
Во втором конвейере (чтение конвейера) мы попробовали PubsubIO.readMessagesWithAttributes().withTimestampAttribute("published_at")
и PubsubIO.readStrings().withTimestampAttribute("published_at")
для источника.
- При работе с DirectRunner все работало хорошо, как и ожидалось.Сообщения были прочитаны из подписки PubSub и выведены на последующие этапы с
ProcessContext.timestamp()
, равным их отметке времени события "published_at"
. - Но при работе с DataflowRunner ,
ProcessContext.timestamp()
всегда был установлен около реального времени , которое закрыто для времени обработки тонущего конвейера.Мы проверили и можем подтвердить, что эти временные метки были не от времени публикации PubSub .Все данные были тогда назначены неправильным окнам по сравнению с их отметкой времени домена событий.Мы ожидали, что поздние данные будут отброшены, чтобы их нельзя было назначить в недопустимые окна.
Примечание. Мы оставили тему Pubsub, заполненную значительным объемом данных, прежде чем включить второй конвейер дляиметь некоторые исторические / поздние данные.
Сообщения Pubsub с недопустимой меткой времени контекста
Предполагаемая основная причина
Глубже изучая исходный код DataflowRunner, мы видим, что Служба потоков данных использует совершенно другой код Pubsub (переопределяя PubsubIO.Read во время построения конвейера) для чтения и отправки в Pubsub .
Поэтому, если мы хотим использовать PubsubIO для Beam SDK, мы должны использовать экспериментальную опцию "enable_custom_pubsub_source"
.Но пока не повезло, поскольку мы столкнулись с этой проблемой https://jira.apache.org/jira/browse/BEAM-5674 и не смогли протестировать коды Pubsub Beam SDK.
Обходное решение
Наш текущий обходной путь заключается в том, что после шага, присваивающего окнам сообщениям, мы реализовали a DoFn
для проверки их метки времени по сравнению с IntervalWindow
. Если окна недействительны , тогда мы просто отбрасываем сообщения , а затем запускаем еженедельные или полнедельные задания, чтобы исправить их из исторического источника.Лучше иметь некоторые пропущенные данные, чем неправильно рассчитанные.
Сообщения, отброшенные из-за неправильных окон
Пожалуйста, поделитесь с нами опытом по этому делу.Мы знаем, что с точки зрения управления водяными знаками в потоке данных водяной знак, как говорят, приспосабливается к текущему реальному времени, если загружаемые данные разбросаны (не достаточно плотно с течением времени).
Мы также считаем, что мы неправильно понимаемкое-что о том, как служба Dataflow поддерживает временную метку вывода PubsubUnboundedSource, поскольку мы все еще новичок в Apache Beam и Google Dataflow, поэтому есть вещи, о которых мы еще не узнали.
Большое спасибо!