Для конкретного случая PubsubIO в потоке данных имейте в виду, что поток данных переопределяет PubsubIO и обрабатывает чтение и запись сообщений в Pubsub в рамках реализации потоковой передачи.Из-за этой замены я видел ту же ошибку, которую вы обсуждаете, в журналах под заголовком «shuffler», а не «worker».
Я обошел эту проблему, реализовав пользовательское преобразование перед PubsubIO.write () шаг.Это преобразование LimitPayloadSize просто проверяет, сколько байтов содержится в PubsubMessage, и пропускает только сообщения с полезной нагрузкой менее 7 МБ.
В настоящее время нет свободного API для обработки ошибок в преобразованиях, хотя это то, что обсуждалось,На данный момент принятым шаблоном является определение преобразования с несколькими выходными коллекциями, а затем запись коллекции сообщений о сбоях где-то еще (например, GCS через FileIO).Вы можете реализовать это как пустой DoFn или посмотреть раздел:
PCollectionList<PubsubMessage> limitedPayloads = input.apply("Limit payload size", Partition.of(2, new PartitionFn<PubsubMessage>)) {
public int partitionFor(PubsubMessage message, int numPartitions) {
return message.getPayload().size < 7 * 1000 * 1000 ? 0 : 1;
}
}));
limitedPayloads.get(0).apply(PubsubIO.write()...);
limitedPayloads.get(1).apply(FileIO.write()...);