Обработка ошибок записи выходных данных Apache Beam в GCP - PullRequest
0 голосов
/ 02 июля 2018

Мне нужно применить обработку ошибок к моему потоку данных для нескольких вставок в Spanner с одним и тем же первичным ключом. Логика заключается в том, что более старое сообщение может быть получено после текущего сообщения, и я не хочу перезаписывать сохраненные значения. Поэтому я создам свою мутацию как вставку и выдам ошибку при попытке повторной вставки.

Я видел несколько примеров блоков try внутри DoFn, которые записывают в сторонний вывод для регистрации любых ошибок. Это очень хорошее решение, но мне нужно применить обработку ошибок к шагу, который пишет в Spanner, который не содержит DoFn

spannerBranchTuples2.get(spannerOutput2)
    .apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))                      
    .apply("Write Spanner Records", SpannerIO.write()
        .withInstanceId(options.getSpannerInstanceId())                  
        .withDatabaseId(options.getSpannerDatabaseId())
        .grouped());

Я не нашел никакой документации, позволяющей применить обработку ошибок к этому шагу, или нашел способ переписать его как DoFn. Любые предложения, как применить обработку ошибок к этому? спасибо

1 Ответ

0 голосов
/ 04 июля 2018

Существует интересный шаблон для этого в документации Dataflow .

По сути, идея состоит в том, чтобы иметь DoFn перед отправкой результатов в ваши письменные преобразования. Это выглядело бы примерно так:

final TupleTag<Output> successTag = new TupleTag<>() {};
final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* … */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {
  @Override
  void processElement(ProcessContext c) {
  try {
    c.output(process(c.element());
  } catch (Exception e) {
    LOG.severe("Failed to process input {} -- adding to dead letter file",
      c.element(), e);
    c.sideOutput(deadLetterTag, c.element());
  }
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

outputTuple.get(deadLetterTag)
  .apply(/* Write to a file or table or anything */);

outputTuple.get(successTag)
  .apply(/* Write to Spanner or any other sink */);

Дайте мне знать, если это полезно!

...