Apache луч, похоже, усекает полезную нагрузку паб-сообщения - PullRequest
0 голосов
/ 15 апреля 2020

Мы создали довольно простой конвейер для обработки паб-событий. Полезная нагрузка паб-сообщения сама по себе представляет собой данные CSV с разделителями табуляции

После прочтения сообщения данные полезной нагрузки усекаются при накачке обратно в объект события. Используя прямой бегун и работая локально, конвейер работает от начала до конца.

Это только при запуске в средстве бега Облачный поток данных Google, где мы видим, что данные этого сообщения усечены?

        // Create the pipeline
        Pipeline pipeline = Pipeline.create(options);

        LOG.info("Reading from subscription: " + options.getInputSubscription());

        //Step #1: Read from a PubSub subscription.
        PCollection<PubsubMessage> pubsubMessages = pipeline.apply(
            "ReadPubSubSubscription",
            PubsubIO.readMessagesWithMessageId()
                    .fromSubscription(options.getInputSubscription())
        );


        //Step #2: Transform the PubsubMessages into snowplow events.
        PCollection<Event> rawEvents = pubsubMessages.apply(
            "ConvertMessageToEvent",
            ParDo.of(new PubsubMessageEventFn())
        );

        // other pipeline functions.....

Здесь функция преобразования, где для каждого дополнительного сообщения паба попадали в ошибку дело. Обратите внимание, что Event.parse () на самом деле является библиотекой scala, но я не понимаю, как это может повлиять на это, поскольку сами данные сообщения - это то, что было усечено между двумя этапами конвейера.

Возможно, есть проблема с кодировкой?


    public static class PubsubMessageEventFn extends DoFn<PubsubMessage, Event> {
        @ProcessElement
        public void processElement(ProcessContext context) {
            PubsubMessage message = context.element();
            Validated<ParsingError, Event> event = Event.parse(new String(message.getPayload()));
            Either<ParsingError, Event> condition = event.toEither();
            if (condition.isLeft()) {
                ParsingError err = condition.left().get();
                LOG.error("Event parsing error: " + err.toString() + " for message: " + new String(message.getPayload()));
            } else {
                Event e = condition.right().get();
                context.output(e);
            }
        }
    }

Вот пример данных, которые передаются в сообщении журнала:

Event parsing error: FieldNumberMismatch(5) for message: 4f6ec25-67a7-4edf-972a-29e80320f67f web 2020-04-14 21:26:40.034 2020-04-14 21:26:39.884 2020-04-1

1 Ответ

2 голосов
/ 15 апреля 2020

Обратите внимание, что реализация Pub / Sub для DirectRunner отличается от реализации в Dataflow Runner, как описано здесь - https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub#integration -features .

Я полагаю, что проблема связана с кодировкой, поскольку message.getPayload имеет тип bytes, и код может потребоваться изменить как new String(message.getPayload(), StandardCharsets.UTF_8) в строке ниже

Validated<ParsingError, Event> event = Event.parse(new String(message.getPayload(), StandardCharsets.UTF_8));
...