Я пытаюсь использовать обработку состояния Beam в Dataflow, но я получаю эти ошибки в журнале каждый раз, когда пытаюсь вывести данные. В результате ничего не выводится из состояния ParDo
+ DoFn
:
16:45:56.948 CEST Proposing dynamic split of work unit myproject;2020-03-31_07_34_07-7523868393961495218;8536385410242733529 at {"fractionConsumed":0.5}
16:45:56.948 CEST Rejecting split request because custom reader returned null residual source.
Edit Это кажется случайным. кажется , как если бы ParDo
с состоянием не выводил никаких элементов, пока не сработает окно. Это правильно?
В этом примере реплицируется ошибка с .batchByKey
Scio (используется скрытая обработка с сохранением состояния):
val create = Create.of(()).withCoder(CoderMaterializer.beam(sc, Coder[Unit]))
sc.customInput("Unit input", create)
.map(_ => println("STARTING"))
.applyTransform(ParDo.of(new Increasing)) // Outputs infinite stream of increasing numbers, one per second, prints each number to stdout
.keyBy(1 -> _)
.batchByKey(5)
.map {
case (key, vs) => vs.foreach(v => println(s"GOT batch with $v"))
}
sc.run()
Финальный .map
, который является просто ParDo
+ DoFn
с одним выходом, никогда не запускается.
В выводе я вижу пять строк увеличивающихся чисел (от new Increasing
), за которыми следуют два сообщения выше. Это повторяется.
Кто-нибудь знает, в чем может быть ошибка? Это кажется источником apache / beam /../ WorkerCustomSources.java # L698