Я не совсем уверен, что вы делаете, но похоже, что вы хотите:
- Считать некоторые существующие данные с ключом (или частичным ключом), соответствующим сообщению pubsub
- Удалить эти данные
- Вставить новые данные из сообщения pubsub
Один из вариантов - создать DoFn
, который выполняет чтение / удаление / вставку (или чтение /обновление) в транзакции чтения-записи. Это сохранит согласованность БД ...
Используйте SpannerIO.WriteFn в качестве модели - вам нужно установить SpannerAccessor
как переходный процесс и создать / удалить его в @Setup
и @Teardown
обработчики событий
Обработчик @ProcessElement
вашего DoFn
создаст транзакцию чтения-записи , внутри которой вы будете читать строки для ключа, обновлять или удалятьих, а затем вставляет новый элемент (ы).
Недостатком этого метода является то, что для каждой транзакции Spanner будет обрабатываться только одно сообщение Pub / Sub (если на предыдущем шаге вы не сделаете что-то умное, например, группируете их), и это сложная транзакция чтения-записи. ,Если ваша скорость передачи сообщений / сек относительно низкая, это было бы хорошо, но если нет, то этот метод будет гораздо больше загружать вашу БД.
Второй вариант - использовать слепое удаление диапазона ключей. ,Это может работать только в том случае, если object_id является первой частью составного ключа (который, похоже, из вашего кода).
Вы должны создать MutationGroup
, содержащую мутацию удаления, которая удаляет вслепую любые существующие строки, ключи которых начинаются с object_id, используя мутацию Delete с диапазоном клавиш, а затем вставляются мутации для замены удаленных строк.
MutationGroup.create(
// Delete rows with key starting with object_id.
Mutation.delete("TableName", KeySet.newBuilder()
.addRange(
KeyRange.closedClosed(
Key.of(str.getString("object_id")),
Key.of(str.getString("object_id"))))
.build()),
// Insert replacement rows.
Mutation.newInsertBuilder("TableName")
.set("column").to("value"),
...
.build(),
Mutation.newInsertBuilder("TableName")
...);
Затем он будет передан SpannerIO.write (). Grouped (), как и раньше, чтобы их можно было объединить для повышения эффективности.