У меня есть определенный вариант использования, где я пытаюсь записать данные о туберкулезе в Spanner. Мы извлекаем эти данные из DynamoDb и экспортируем эти данные в формате bzip2 в Google Cloud Storage. Таким образом, в основном у нас есть первичные идентификаторы в Spanner, и мы должны игнорировать уже существующие строки в Spanner. Поэтому я написал код ниже для достижения того же.
Mutation.WriteBuilder mutation = Mutation.newInsertBuilder(spannerTable.get());
Я написал конструктор вставок, так как не хочу обновлять существующую строку в Spanner. Используя приведенный ниже код для записи строки в Spanner, установите FailureMode.
results2.apply("Write Mutations to Spanner",SpannerIO.write()
.withInstanceId(spannerInstanceId)
.withDatabaseId(spannerDatabaseId)
//.withBatchSizeBytes(2000000)
//.withMaxNumMutations(maxNumMutations)
.withFailureMode(FailureMode.REPORT_FAILURES)
);
Но проблема с кодом заключается в том, что код потока данных повторяет весь пакет из-за мутации "ALREADY EXISTS". Я не могу использовать FailureMode.FAST_FAIL, потому что он останавливает весь конвейер. Я также попытался установить минимальное значение MaxNumMutation для уменьшения размера batch_size (в основном, для уменьшения вероятности записи «уже_экспертиза» в пакете мутаций), но общая производительность мешает. Так есть ли способ, которым я могу остановить механизм повтора для «уже существующей» записи мутации?