Я хочу реализовать следующий сценарий с использованием потокового конвейера в Apache Beam (и запуск его в Google DataFlow):
- Чтение сообщений из Pub / Sub (строки JSON)
- Десериализация JSON
- Использование настраиваемого поля (скажем,
timeStamp
) в качестве значения метки времени для элемента обработки - Применение фиксированного управления окнами для
60 seconds
- Извлечение ключа из элементов и группыпо ключу
- << выполнить дальнейшую обработку >>
Я пытался решить эту проблему, используя как Java (Scala), так и Python, но ни одно из решений не сработало.
- Решение Python
# p is beam.Pipeline()
_ = (p | beam.io.ReadFromPubSub(subscription="my_sub")
| beam.Map(add_timestamping)
| beam.WindowInto(window.FixedWindows(60))
| beam.Map(lambda elem: elem) # exracting the key somehow, not relevant here
| beam.GroupByKey()
# (...)
| beam.io.WriteToPubSub("output_topic")
)
p.run()
add_timestamping
функция согласно документация :
def add_timestamping(elem):
import json
import apache_beam as beam
msg = json.loads(elem)
unix_timestamp = msg['timeStamp'] / 1000
return beam.window.TimestampedValue(msg, unix_timestamp)
Вывод решения Python :
- При использовании
DirectRunner
окна испускаются, и само окно более или менее подходит в зависимости от задержки. - При использовании
DataFlowRunner
, ВСЕ сообщения пропускаются со счетчиком, появляющимся в интерфейсе DataFlow: dropDueToLateness .
Решение Java / Scala (я использовал
Scio , но это также происходит в чистом Beam SDK и в Java)
sc.pubsubSubscription[String]("my_sub")
.applyTransform(ParDo.of(new CustomTs()))
.withFixedWindows(Duration.standardSeconds(60))
.map(x => x) // exracting the key somehow, not relevant here
.groupByKey
// (...)
.saveAsPubsub("output_topic")
Добавлениепользовательская отметка времени согласно документации :
import io.circe.parser._
class CustomTs extends DoFn[String, String] {
@ProcessElement
def processElement(@Element element: String, out: DoFn.OutputReceiver[String]): Unit = {
val json = parse(element).right.get
val timestampMillis: Long = json.hcursor.downField("timeStamp").as[Long].getOrElse(0)
out.outputWithTimestamp(element, new Instant(timestampMillis))
}
}
Вывод решения Java / Scala :
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalArgumentException:
Cannot output with timestamp 2019-03-02T00:51:39.124Z.
Output timestamps must be no earlier than the timestamp of the current input
(2019-03-28T14:57:53.195Z) minus the allowed skew (0 milliseconds).
Я не могу использовать DoFn.getAllowedTimestampSkew
здесьпоскольку это уже устарело, и я не знаю, какие диапазоны исторических данных будут отправлены.
Возможность обработки исторических данных имеет решающее значение для моего проекта (эти данные будут отправлены в Pub / Subиз какого-то магазина).Конвейер должен работать как с текущими данными, так и с историческими.
Мой вопрос: Как обрабатывать данные с использованием пользовательских временных меток с возможностью работы с окнами, определенными с помощью API Beam?