Я написал конвейер Beam в Go, который успешно работает на моем локальном компьютере, но когда я добавляю --runner=dataflow
для запуска его в облачном потоке данных Google, я получаю неопределенную ошибку, когда он устанавливает, что отсутствует ParDo ParDoPayload. Трассировка стека полностью Java, поэтому я не уверен, как перевести это обратно в мой код Go, чтобы выяснить, что мне не хватает.
Я прошел и использовал beam.RegisterFunction()
для всех моих функций, которые emit
, а также использовали beam.RegisterType()
для структуры верхнего уровня, которую я передаю.
Любые идеи, как эта ошибка связана с кодом, который я написал / как я могу отладка?
java.lang.RuntimeException: ParDo did not have a ParDoPayload
at org.apache.beam.runners.dataflow.worker.graph.RegisterNodeFunction.apply(RegisterNodeFunction.java:327)
at org.apache.beam.runners.dataflow.worker.graph.RegisterNodeFunction.apply(RegisterNodeFunction.java:97)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.beam.runners.dataflow.worker.graph.CreateRegisterFnOperationFunction.apply(CreateRegisterFnOperationFunction.java:207)
at org.apache.beam.runners.dataflow.worker.graph.CreateRegisterFnOperationFunction.apply(CreateRegisterFnOperationFunction.java:74)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:346)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305)
at org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:195)
at org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:123)
Caused by: org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException: Protocol message had invalid UTF-8.
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException.invalidUtf8(InvalidProtocolBufferException.java:141)
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$DecodeUtil.handleTwoBytes(Utf8.java:1909)
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$DecodeUtil.access$700(Utf8.java:1883)
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$UnsafeProcessor.decodeUtf8(Utf8.java:1411)
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8.decodeUtf8(Utf8.java:340)
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.CodedInputStream$ArrayDecoder.readStringRequireUtf8(CodedInputStream.java:804)
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec.<init>(RunnerApi.java:55936)
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec.<init>(RunnerApi.java:55897)
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec$1.parsePartialFrom(RunnerApi.java:56565)
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec$1.parsePartialFrom(RunnerApi.java:56559)
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.CodedInputStream$ArrayDecoder.readMessage(CodedInputStream.java:883)
at org.apache.beam.model.pipeline.v1.RunnerApi$ParDoPayload.<init>(RunnerApi.java:10363)
at org.apache.beam.model.pipeline.v1.RunnerApi$ParDoPayload.<init>(RunnerApi.java:10320)
at org.apache.beam.model.pipeline.v1.RunnerApi$ParDoPayload$1.parsePartialFrom(RunnerApi.java:12633)
at org.apache.beam.model.pipeline.v1.RunnerApi$ParDoPayload$1.parsePartialFrom(RunnerApi.java:12627)
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100)
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120)
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125)
at org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
at org.apache.beam.model.pipeline.v1.RunnerApi$ParDoPayload.parseFrom(RunnerApi.java:11130)
at org.apache.beam.runners.dataflow.worker.graph.RegisterNodeFunction.apply(RegisterNodeFunction.java:325)
... 10 more