В настоящее время я пытаюсь обновить потоковое задание statefult, которое в основном выглядит следующим образом (используя версии 2.7 / 2.8 sdks):
pipeline
.apply("Read from pubsub", ...read from pubsub...)
.apply(
"Map PubSubMessages",
MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(PubSubMessage.class)))
.via(input -> KV.of(input.getId1() + ":" + input.getId2(), input))
)
.apply(...stateful processing...)
PubSubMessage декодируется / кодируется с использованием автоматически сгенерированного класса:
registry.registerCoderForClass(PubSubMessage.class, ProtoCoder.of(PubSubMessage.class));
Проблема, с которой я столкнулся сейчас, заключается в том, что сообщение было обновлено.PubSubMessage
теперь имеет два дополнительных поля.Поскольку он proto3
, он обратно совместим.Однако, поскольку сгенерированный класс изменился, похоже, что поток данных считает, что тип также изменился, и старые сообщения / шаги больше не совместимы, и не может обновить задание следующим образом:
Рабочий процесс не выполнен.Причины: новое задание несовместимо с существующим заданием.Исходное задание не было прервано. Кодер или тип шага Map PubSubMessages для определения состояния объекта / Map.out0 / FromValue изменился.
Было бы несколько сложно остановитьработа (истощая свои сообщения) и перезапуская его заново.Есть ли лучший способ обновить тип / protobuf / кодер в этом случае?
Спасибо!