Мы создали довольно простой конвейер для обработки паб-событий. Полезная нагрузка паб-сообщения сама по себе представляет собой данные CSV с разделителями табуляции
После прочтения сообщения данные полезной нагрузки усекаются при накачке обратно в объект события. Используя прямой бегун и работая локально, конвейер работает от начала до конца.
Это только при запуске в средстве бега Облачный поток данных Google, где мы видим, что данные этого сообщения усечены?
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
LOG.info("Reading from subscription: " + options.getInputSubscription());
//Step #1: Read from a PubSub subscription.
PCollection<PubsubMessage> pubsubMessages = pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithMessageId()
.fromSubscription(options.getInputSubscription())
);
//Step #2: Transform the PubsubMessages into snowplow events.
PCollection<Event> rawEvents = pubsubMessages.apply(
"ConvertMessageToEvent",
ParDo.of(new PubsubMessageEventFn())
);
// other pipeline functions.....
Здесь функция преобразования, где для каждого дополнительного сообщения паба попадали в ошибку дело. Обратите внимание, что Event.parse () на самом деле является библиотекой scala, но я не понимаю, как это может повлиять на это, поскольку сами данные сообщения - это то, что было усечено между двумя этапами конвейера.
Возможно, есть проблема с кодировкой?
public static class PubsubMessageEventFn extends DoFn<PubsubMessage, Event> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
Validated<ParsingError, Event> event = Event.parse(new String(message.getPayload()));
Either<ParsingError, Event> condition = event.toEither();
if (condition.isLeft()) {
ParsingError err = condition.left().get();
LOG.error("Event parsing error: " + err.toString() + " for message: " + new String(message.getPayload()));
} else {
Event e = condition.right().get();
context.output(e);
}
}
}
Вот пример данных, которые передаются в сообщении журнала:
Event parsing error: FieldNumberMismatch(5) for message: 4f6ec25-67a7-4edf-972a-29e80320f67f web 2020-04-14 21:26:40.034 2020-04-14 21:26:39.884 2020-04-1