Я смог воспроизвести проблему с DirectRunner
и простым конвейером, который читает из Pub / Sub, использует первое слово сообщения в качестве ключа, применяет GroupByKey
и затем регистрирует записи. Похоже, шаг GBK ожидает поступления всех данных и, поскольку это неограниченный источник, не дает никакого результата. Для меня сработало определение стратегии управления окнами с помощью триггеров, таких как:
object PubSubTest {
private lazy val log = LoggerFactory.getLogger(this.getClass)
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val defaultInputSub = "test_sub"
val subscription = args.getOrElse("input", defaultInputSub)
val project = "PROJECT_ID"
sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
// provide window options including triggering
.withFixedWindows(duration = Duration.standardSeconds(10), options = WindowOptions(
trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2))),
accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
closingBehavior = ClosingBehavior.FIRE_IF_NON_EMPTY,
allowedLateness = Duration.standardSeconds(0))
)
// use first word of the Pub/Sub message as the key
.keyBy(a => a.split(" ")(0))
.groupByKey
.map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))
val result = sc.close().waitUntilFinish()
}
}