Можно ли перехватить отсутствующий набор данных java .lang.RuntimeException в конвейере потоков данных Google Cloud, который пишет из Pub / Sub в BigQuery? - PullRequest
1 голос
/ 04 марта 2020

Я пытаюсь обработать ошибки, из-за которых мое задание Dataflow пытается динамически записать в назначения таблицы BigQuery.

Я хотел бы перехватить следующее исключение:

java .lang.RuntimeException: не удалось получить набор данных для набора данных example_dataset в проекте example_project

, чтобы создать набор данных и затем повторить запись в BigQuery.

Можно ли таким образом перехватывать исключения, и если да, знаете ли вы, где мне нужно было бы добавить логи try / catch c в мой код?

Ответы [ 2 ]

1 голос
/ 08 марта 2020

Вы не можете обработать этот сценарий, используя блок try-catch, так как это внутренняя ошибка API BQ. Скорее, я бы предложил вам написать политику Retry Transient и установить тип ошибки. Таким образом, вы можете сохранить результат ошибки записи BigQuery в PCollection и затем вывести эту запись как ваш wi sh. Пожалуйста, обратитесь к приведенному ниже фрагменту, чтобы добиться того же.

WriteResult result = formattedData.get(successRows).setCoder(TableRowJsonCoder.of()).apply("BQ SteamingInserts",
                BigQueryIO.writeTableRows().withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .to("audit.db_audit")
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()).withoutValidation()
                        .withExtendedErrorInfo());

Используя приведенный выше фрагмент кода, если что-то не получается из-за операций ddl, данные будут сохранены в WriteResult.

PCollection<String> failedInserts = result.getFailedInsertsWithErr().apply("BQErrorToTableRow",
                ParDo.of(new BQErrorToString()));

Вы можете получить неудачная запись с использованием приведенного выше фрагмента кода. Дайте мне знать, если это поможет:)

1 голос
/ 04 марта 2020

Несуществующие наборы данных и / или таблицы BigQuery будут повторяться бесконечно и могут привести к зависанию конвейера. BigQueryIO не имеет настраиваемой опции для автоматического создания несуществующих наборов данных BigQuery, у него есть только опция для создания несуществующих таблиц BigQuery, но указанный ресурс набора данных должен существовать или быть создан до вызова writing to table кода.

Я также нашел в документации Beam , в которой делается вывод, что

набор данных, в который выполняется запись, должен уже существовать

Пожалуйста, обратитесь к официальная документация и посмотрите, как Java исключения обрабатываются в облачном потоке данных, и посмотрите примеры .

Служба потока данных повторяет сбойные задачи до 4 раз в пакетном режиме и неограниченное количество раз в потоковом режиме. В пакетном режиме ваша работа не будет выполнена, а в потоковом режиме она может зависнуть на неопределенное время.

Надеюсь, это поможет.

...