PubsubIO, сообщение превышает максимальный размер, как выполнить обработку ошибок - PullRequest
0 голосов
/ 15 января 2019

Мы запускаем конвейер в потоке данных GCP и достигаем максимального размера сообщения pubsub [1] Когда это произойдет, время задержки в трубопроводе начнет нарастать, в конечном итоге останавливаясь ...

Это сообщение журнала было создано в стековом драйвере GCP в разделе «dataflow_step»,

Мой вопрос, есть ли способ определить обработку ошибок в конвейере ...

.apply(PubsubIO.writeMessages()
                        .to("topic")
                        .withTimestampAttribute(Instant.now().toString()));

с чем-то вроде

.onError(...perform error handling ...)

В той же манере, что и Java8 streams api. что позволило бы конвейеру продолжить с выходными данными, которые находятся в пределах pubsub.

Другие решения для решения этой ситуации приветствуются.

Спасибо, Кристоф Буйер

[1] Не удалось зафиксировать запрос из-за ошибки проверки: generic :: invalid_argument: запросы публикации Pubsub ограничены 10 МБ, отклонение сообщения более 7 МБ во избежание превышения лимита при кодировании запроса byte64.

1 Ответ

0 голосов
/ 22 января 2019

Для конкретного случая PubsubIO в потоке данных имейте в виду, что поток данных переопределяет PubsubIO и обрабатывает чтение и запись сообщений в Pubsub в рамках реализации потоковой передачи.Из-за этой замены я видел ту же ошибку, которую вы обсуждаете, в журналах под заголовком «shuffler», а не «worker».

Я обошел эту проблему, реализовав пользовательское преобразование перед PubsubIO.write () шаг.Это преобразование LimitPayloadSize просто проверяет, сколько байтов содержится в PubsubMessage, и пропускает только сообщения с полезной нагрузкой менее 7 МБ.

В настоящее время нет свободного API для обработки ошибок в преобразованиях, хотя это то, что обсуждалось,На данный момент принятым шаблоном является определение преобразования с несколькими выходными коллекциями, а затем запись коллекции сообщений о сбоях где-то еще (например, GCS через FileIO).Вы можете реализовать это как пустой DoFn или посмотреть раздел:

PCollectionList<PubsubMessage> limitedPayloads = input.apply("Limit payload size", Partition.of(2, new PartitionFn<PubsubMessage>)) {
  public int partitionFor(PubsubMessage message, int numPartitions) {
    return message.getPayload().size < 7 * 1000 * 1000 ? 0 : 1;
  }
}));
limitedPayloads.get(0).apply(PubsubIO.write()...);
limitedPayloads.get(1).apply(FileIO.write()...);
...