Я удалил копию шаблона Pub / Sub to BigQuery Dataflow из Google github репозитория . Я запускаю его на своей локальной машине, используя direct-runner .
В ходе тестирования я подтвердил, что шаблон записывает сбои в таблицу «deadletter» только в случае возникновения ошибки во время обработки UDF или преобразования из JSON в TableRow.
Я также хотел бы обрабатывать сбои, возникающие во время вставки в BigQuery, более изящно, отправляя их в отдельный тег TupleTag, чтобы их также можно было отправлять в таблицу deadletter или другой вывод для просмотра и обработки. В настоящее время при выполнении с dataflow-runner эти ошибки записываются только в журналы Stackdriver и продолжают повторяться до бесконечности, пока проблема не будет решена.
Вопрос первый : при локальном тестировании и публикации сообщения с форматом, не соответствующим схеме таблицы назначения, вставка повторяется 5 раз, а затем происходит сбой конвейера с RuntimeException и ошибкой, возвращаемой из HTTP ответ на API Google. Я считаю, что это поведение устанавливается в BigQueryServices.Impl здесь:
private static final FluentBackoff INSERT_BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
Однако на основе документации Google ,
"При работе в потоковом режиме комплект, включающий неисправный элемент
будет повторяться бесконечно, что может привести к
навсегда заглох. "
As Beam's Pub / Sub.IO ,
создавать и использовать неограниченные коллекции ПК
У меня сложилось впечатление, что режим потоковой передачи должен быть включен по умолчанию при чтении из Pub / Sub. Я даже дошел до добавления метода Streaming_Inserts при моем вызове writeTableRows (), и это не повлияло на это поведение.
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
- Это поведение каким-то образом зависит от того, какой я бегун?
с помощью? Если нет, то где же недостаток в моем понимании?
Вопрос второй :
- Есть ли разница в производительности при использовании BigQueryIO.write против BigQueryIO.writeTableRows ?
Я спрашиваю, потому что я не вижу, как я могу зафиксировать ошибки, связанные со вставкой, без создания собственного статического класса, который переопределяет метод расширения и использует ParDo и DoFn, где я могу добавить собственную настраиваемую логику для создания отдельных TupleTags для успешных записей. и записи об ошибках, аналогично тому, как это было сделано в JavascriptTextTransformer для FailsafeJavascriptUdf.
Обновление
public static PipelineResult run(DirectOptions options) {
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoder<PubsubMessage, String> coder =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
PCollectionTuple transformOut =
pipeline
//Step #1: Read messages in from Pub/Sub
.apply(
"ReadPubsubMessages",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
//Step #2: Transform the PubsubMessages into TableRows
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
WriteResult writeResult = null;
try {
writeResult =
transformOut
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to("myproject:MyDataSet.MyTable"));
} catch (Exception e) {
System.out.print("Cause of the Standard Insert Failure is: ");
System.out.print(e.getCause());
}
try {
writeResult
.getFailedInserts()
.apply(
"WriteFailedInsertsToDeadLetter",
BigQueryIO.writeTableRows()
.to(options.getOutputDeadletterTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
} catch (Exception e) {
System.out.print("Cause of the Error Insert Failure is: ");
System.out.print(e.getCause());
}
PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
.and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
Ошибка:
Cause of the Error Insert Failure is: null[WARNING]
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)