Scio / Apache beam, как отобразить сгруппированные результаты - PullRequest
0 голосов
/ 14 сентября 2018

У меня есть простой конвейер, который читает из pubsub в фиксированном окне, анализирует сообщения и группирует их по определенному свойству. Однако, если я map после groupBy, моя функция, кажется, не выполняется.

Я что-то упустил?

sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
  .withFixedWindow(Duration.standardSeconds(windowSeconds))
  .map(parseMessage)
  .groupBy(_.ip_address)
  .map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))

1 Ответ

0 голосов
/ 16 сентября 2018

Я смог воспроизвести проблему с DirectRunner и простым конвейером, который читает из Pub / Sub, использует первое слово сообщения в качестве ключа, применяет GroupByKey и затем регистрирует записи. Похоже, шаг GBK ожидает поступления всех данных и, поскольку это неограниченный источник, не дает никакого результата. Для меня сработало определение стратегии управления окнами с помощью триггеров, таких как:

object PubSubTest {
  private lazy val log = LoggerFactory.getLogger(this.getClass)

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    val defaultInputSub = "test_sub"
    val subscription = args.getOrElse("input", defaultInputSub)
    val project = "PROJECT_ID"

    sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
      // provide window options including triggering
      .withFixedWindows(duration = Duration.standardSeconds(10), options = WindowOptions(
        trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
          .plusDelayOf(Duration.standardSeconds(2))),
        accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
        closingBehavior = ClosingBehavior.FIRE_IF_NON_EMPTY,
        allowedLateness = Duration.standardSeconds(0))
      )
      // use first word of the Pub/Sub message as the key
      .keyBy(a => a.split(" ")(0))
      .groupByKey
      .map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))

    val result = sc.close().waitUntilFinish()
  }
}
...