Как обрабатывать ошибки вставки BigQuery в конвейерах потока данных с использованием Java? - PullRequest
0 голосов
/ 23 марта 2019

Я анализирую XML и пишу в Bigquery, используя конвейер потока данных. Как можно обработать ошибки, если в BigQuery произошла ошибка вставки? Я хочу написать собственный код для записи сбойного XML в корзину ошибок.

1 Ответ

0 голосов
/ 11 апреля 2019

Следующий код получает ошибочные строки при записи в bigquery:

TableRow row1 = new TableRow().set("name", "a").set("number", "1");
TableRow row2 = new TableRow().set("name", "b").set("number", "2");
TableRow row3 = new TableRow().set("name", "c").set("number", "error");    
PCollection<TableRow> failedRows =
        p.apply(Create.of(row1, row2, row3))
            .apply(
                BigQueryIO.writeTableRows()
                    .to("project-id:dataset-id.table-id")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withSchema(
                        new TableSchema()
                            .setFields(
                                ImmutableList.of(
                                    new TableFieldSchema().setName("name").setType("STRING"),
                                    new TableFieldSchema().setName("number").setType("INTEGER"))))             
            .getFailedInserts();
...