Почему я получаю много исключений None.get при сливе потокового конвейера? - PullRequest
0 голосов
/ 18 января 2019

Я сталкиваюсь с проблемами, когда у меня есть потоковый конвейер scio, работающий на потоке данных, который дедуплицирует сообщения и выполняет некоторый подсчет по ключу. Когда я пытаюсь опустошить конвейер, я получаю большое количество None.get исключений, предположительно выбрасываемых на моем дедуплицирующем шаге (я основываю это предположение на ярлыке, который наблюдаю в журнале стекового драйвера).

В настоящее время мы работаем на scio версии 0.7.0-beta1 и лучевой версии 2.8.0. Я пытался защитить в своем коде как можно больше от любых потенциальных None, но похоже, что это происходит дальше внутри дедуплицирующего шага.

Я получаю следующую ошибку:

"java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at com.spotify.scio.util.Functions$$anon$2.mergeAccumulators(Functions.scala:227)
    at com.spotify.scio.util.Functions$$anon$2.mergeAccumulators(Functions.scala:220)
    at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.getAccum(WindmillStateInternals.java:958)
    at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.read(WindmillStateInternals.java:920)
    at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
    at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
    at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
    at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
    at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
    at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:135)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:45)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:50)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:202)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:160)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
    at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1226)
    at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:141)
    at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:965)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Как видите, это никогда не входит в мой код, и я не уверен, как мне найти эту проблему. Возможно, это как-то связано с «LateDataDroppingDoFnRunner»? Допустимая задержка относительно велика (3 дня с часами в окнах).

val input = PubsubIO.readStrings()
  .fromSubscription(subscription)
  .withTimestampAttribute("ts")
  .withName("Window messages")
    .withFixedWindows(
      duration = windowSize,
      options = WindowOptions(
        trigger = AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(earlyFiring))
          .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(lateFiring)),
        accumulationMode = ACCUMULATING_FIRED_PANES,
        allowedLateness = allowedLateness
      )
    )
  .withName(s"Deduplicate messages")
  .distinctBy[String](f = getId)

...
// I am being overly cautious here because I have been having
// so much trouble debugging this
def getId(message: Map[String, Any]): String = {
  message match {
    case null => {
     logger.warn("message is null when getting id")
      ""
    }
    case message => {
      message.get("id") match {
        case None => {
          logger.warn("id is null in message")
          ""
        }
        case id => id.get.toString
      }
    }
  }
}

Я запутался, как я мог получить здесь None.get и почему это могло произойти, только когда я сливаю.

Могу ли я дать несколько советов о том, как мне следует отладить эту ошибку или где я должен искать?

...