Мне нужно применить обработку ошибок к моему потоку данных для нескольких вставок в Spanner с одним и тем же первичным ключом. Логика заключается в том, что более старое сообщение может быть получено после текущего сообщения, и я не хочу перезаписывать сохраненные значения. Поэтому я создам свою мутацию как вставку и выдам ошибку при попытке повторной вставки.
Я видел несколько примеров блоков try внутри DoFn, которые записывают в сторонний вывод для регистрации любых ошибок. Это очень хорошее решение, но мне нужно применить обработку ошибок к шагу, который пишет в Spanner, который не содержит DoFn
spannerBranchTuples2.get(spannerOutput2)
.apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))
.apply("Write Spanner Records", SpannerIO.write()
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabaseId())
.grouped());
Я не нашел никакой документации, позволяющей применить обработку ошибок к этому шагу, или нашел способ переписать его как DoFn. Любые предложения, как применить обработку ошибок к этому? спасибо