Как исправить невозможность сериализации DoFnWithExecutionInformation - PullRequest
0 голосов
/ 22 марта 2020

Пытаясь запустить конвейер с DirectRunner в Scio / Beam, я получаю следующую ошибку:

java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=anonymous function map@{Foo.scala:59}:1, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
  at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
  at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:606)
  at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:216)
  at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:738)
  at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:212)
  at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:191)
  at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:128)
  at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:225)
  at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:689)
  at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:704)
  at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:269)
  at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:282)
  at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
  at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
  at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
  at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
  at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
  at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
  at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:260)
  at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
  at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
  at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
  at com.spotify.scio.ScioContext.execute(ScioContext.scala:606)
  at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:594)
  at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:582)
  at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:702)
  at com.spotify.scio.ScioContext.run(ScioContext.scala:582)

Код выглядит так:

object Foo {
...
    scol.map { case (day, labeled, unlabeled) => writeFoo(gcpStorage, BlobId.of(bucket, path)) }
...
  def writeFoo(gcpStorage: Storage, blobId: BlobId) {
...
}

Не уверен, что на самом деле проблема есть. Пробовал встроить writeFoo но все равно не работает.

...