Я использую Apache Beam
2.18. Когда я запускаю свой код в DirectRunner, я могу получить messageId и подробности сообщения. Однако после того, как я развернул свой код в потоке данных, иногда я получаю неправильный messageId
и неправильные детали сообщения, как показано ниже.
PubsubMessage{message=[123, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 109, 121, 115, 113, 108, 73, 110, 115, 101, 114, 116, -29, -126, -92, -29, -125, -103, -29, -125, -77, -29, -125, -120, 34, 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 114, 117, 108, 101, 47, -26, -128, -89, -24, -125, -67, -26, -92, -100, -24, -88, -68, -29, -125, -85, -29, -125, -68, -29, -125, -85, 34, 44, 34, 99, 114, 101, 97, 116, 101, 84, 105, 109, 101], attributes={}, messageId=:"2019-05-20T00:00:00.001","proper}
Кроме того, я получаю следующее исключение:
java.io.IOException: varint overflow 51498921187342211
org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:58)
org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:55)
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100)
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90)
org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37)
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithMessageIdCoder.decode(PubsubMessageWithMessageIdCoder.java:51)
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithMessageIdCoder.decode(PubsubMessageWithMessageIdCoder.java:33)
org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
org.apache.beam.runners.dataflow.worker.PubsubReader$PubsubReaderIterator.decodeMessage(PubsubReader.java:129)
org.apache.beam.runners.dataflow.worker.WindmillReaderIteratorBase.advance(WindmillReaderIteratorBase.java:57)
org.apache.beam.runners.dataflow.worker.WindmillReaderIteratorBase.start(WindmillReaderIteratorBase.java:43)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1324)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:151)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1053)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)
это мой код, чтобы получить сообщение из подписки.
PubsubIO.readMessagesWithMessageId().fromSubscription(options.getFromSubscription());
Я могу правильно получить подробности сообщения, когда я использую PubsubIO.readString (), но мне также нужен идентификатор сообщения.
Как я могу решить эту проблему?
С уважением