Apache Beam / Google Dataflow PubSub to BigQuery Pipeline: обработка ошибок вставки и неожиданное поведение при повторных попытках - PullRequest
0 голосов
/ 27 августа 2018

Я удалил копию шаблона Pub / Sub to BigQuery Dataflow из Google github репозитория . Я запускаю его на своей локальной машине, используя direct-runner .

В ходе тестирования я подтвердил, что шаблон записывает сбои в таблицу «deadletter» только в случае возникновения ошибки во время обработки UDF или преобразования из JSON в TableRow.

Я также хотел бы обрабатывать сбои, возникающие во время вставки в BigQuery, более изящно, отправляя их в отдельный тег TupleTag, чтобы их также можно было отправлять в таблицу deadletter или другой вывод для просмотра и обработки. В настоящее время при выполнении с dataflow-runner эти ошибки записываются только в журналы Stackdriver и продолжают повторяться до бесконечности, пока проблема не будет решена.

Вопрос первый : при локальном тестировании и публикации сообщения с форматом, не соответствующим схеме таблицы назначения, вставка повторяется 5 раз, а затем происходит сбой конвейера с RuntimeException и ошибкой, возвращаемой из HTTP ответ на API Google. Я считаю, что это поведение устанавливается в BigQueryServices.Impl здесь:

private static final FluentBackoff INSERT_BACKOFF_FACTORY =
        FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);

Однако на основе документации Google ,

"При работе в потоковом режиме комплект, включающий неисправный элемент будет повторяться бесконечно, что может привести к навсегда заглох. "

As Beam's Pub / Sub.IO ,

создавать и использовать неограниченные коллекции ПК

У меня сложилось впечатление, что режим потоковой передачи должен быть включен по умолчанию при чтении из Pub / Sub. Я даже дошел до добавления метода Streaming_Inserts при моем вызове writeTableRows (), и это не повлияло на это поведение.

.apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
  1. Это поведение каким-то образом зависит от того, какой я бегун? с помощью? Если нет, то где же недостаток в моем понимании?

Вопрос второй :

  1. Есть ли разница в производительности при использовании BigQueryIO.write против BigQueryIO.writeTableRows ?

Я спрашиваю, потому что я не вижу, как я могу зафиксировать ошибки, связанные со вставкой, без создания собственного статического класса, который переопределяет метод расширения и использует ParDo и DoFn, где я могу добавить собственную настраиваемую логику для создания отдельных TupleTags для успешных записей. и записи об ошибках, аналогично тому, как это было сделано в JavascriptTextTransformer для FailsafeJavascriptUdf.

Обновление

public static PipelineResult run(DirectOptions options) {

options.setRunner(DirectRunner.class);

    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    FailsafeElementCoder<PubsubMessage, String> coder =
        FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

     PCollectionTuple transformOut =
        pipeline
             //Step #1: Read messages in from Pub/Sub
            .apply(
                "ReadPubsubMessages",
  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))

             //Step #2: Transform the PubsubMessages into TableRows
            .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));

    WriteResult writeResult = null;

    try {
      writeResult = 
            transformOut
        .get(TRANSFORM_OUT)
        .apply(
            "WriteSuccessfulRecords",      
            BigQueryIO.writeTableRows()
                .withMethod(Method.STREAMING_INSERTS)
                .withoutValidation()
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .to("myproject:MyDataSet.MyTable"));
    } catch (Exception e) {
        System.out.print("Cause of the Standard Insert Failure is: ");
        System.out.print(e.getCause());
    }

    try {
        writeResult
            .getFailedInserts()
            .apply(
                    "WriteFailedInsertsToDeadLetter",
                    BigQueryIO.writeTableRows()
                        .to(options.getOutputDeadletterTable())
                        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    } catch (Exception e) {
        System.out.print("Cause of the Error Insert Failure is: ");
        System.out.print(e.getCause());
    }

     PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
        .and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
        .apply("Flatten", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            WritePubsubMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    maybeUseDefaultDeadletterTable(
                        options.getOutputDeadletterTable(),
                        options.getOutputTableSpec(),
                        DEFAULT_DEADLETTER_TABLE_SUFFIX))
                .setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
                .build());

    return pipeline.run();
  }

Ошибка:

Cause of the Error Insert Failure is: null[WARNING] 
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
    at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
    at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

1 Ответ

0 голосов
/ 27 августа 2018

В последних версиях Beam преобразование BigQueryIO.Write возвращает объект WriteResult , который позволяет извлекать PCollection TableRows, которые не удалось вывести в BigQuery. Используя это, вы можете легко извлекать сбои, форматировать их в структуре выходных данных и отправлять записи в BigQuery. Это устраняет необходимость в отдельном классе для управления успешными и неудачными записями.

Ниже приведен пример того, как это может выглядеть для вашего конвейера.

// Attempt to write the table rows to the output table.
WriteResult writeResult =
    pipeline.apply(
        "WriteRecordsToBigQuery",
        BigQueryIO.writeTableRows()
            .to(options.getOutputTable())
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

/*
 * 1) Get the failed inserts
 * 2) Transform to the deadletter table format.
 * 3) Output to the deadletter table.
*/
writeResult
  .getFailedInserts()
    .apply("FormatFailedInserts", ParDo.of(new FailedInsertFormatter()))
    .apply(
        "WriteFailedInsertsToDeadletter",
        BigQueryIO.writeTableRows()
            .to(options.getDeadletterTable())
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND));

Дополнительно, чтобы ответить на ваши вопросы:

  1. Согласно лучу Документы , вы должны установить streaming опция для true для DirectRunner.
  2. Там не должно быть разница в производительности В любом случае вам нужно будет преобразовать вводить записи в TableRow объектов. Это не должно иметь никакого значения если вы делаете это в ParDo заранее или в сериализуемом функция с использованием BigQueryIO.Write.withFormatFunction .
...