Исключение при попытке чтения из PubSub в потоке данных - PullRequest
1 голос
/ 31 января 2020

Я использую Apache Beam 2.18. Когда я запускаю свой код в DirectRunner, я могу получить messageId и подробности сообщения. Однако после того, как я развернул свой код в потоке данных, иногда я получаю неправильный messageId и неправильные детали сообщения, как показано ниже.

PubsubMessage{message=[123, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 109, 121, 115, 113, 108, 73, 110, 115, 101, 114, 116, -29, -126, -92, -29, -125, -103, -29, -125, -77, -29, -125, -120, 34, 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 114, 117, 108, 101, 47, -26, -128, -89, -24, -125, -67, -26, -92, -100, -24, -88, -68, -29, -125, -85, -29, -125, -68, -29, -125, -85, 34, 44, 34, 99, 114, 101, 97, 116, 101, 84, 105, 109, 101], attributes={}, messageId=:"2019-05-20T00:00:00.001","proper}

Кроме того, я получаю следующее исключение:

java.io.IOException: varint overflow 51498921187342211
        org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:58)
        org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:55)
        org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100)
        org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90)
        org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37)
        org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithMessageIdCoder.decode(PubsubMessageWithMessageIdCoder.java:51)
        org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithMessageIdCoder.decode(PubsubMessageWithMessageIdCoder.java:33)
        org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
        org.apache.beam.runners.dataflow.worker.PubsubReader$PubsubReaderIterator.decodeMessage(PubsubReader.java:129)
        org.apache.beam.runners.dataflow.worker.WindmillReaderIteratorBase.advance(WindmillReaderIteratorBase.java:57)
        org.apache.beam.runners.dataflow.worker.WindmillReaderIteratorBase.start(WindmillReaderIteratorBase.java:43)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1324)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:151)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1053)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:748)

это мой код, чтобы получить сообщение из подписки.

PubsubIO.readMessagesWithMessageId().fromSubscription(options.getFromSubscription());

Я могу правильно получить подробности сообщения, когда я использую PubsubIO.readString (), но мне также нужен идентификатор сообщения.

Как я могу решить эту проблему?

С уважением

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...