Обновление protobuf не проходит проверку совместимости в потоке данных Google - PullRequest
0 голосов
/ 27 ноября 2018

В настоящее время я пытаюсь обновить потоковое задание statefult, которое в основном выглядит следующим образом (используя версии 2.7 / 2.8 sdks):

pipeline
    .apply("Read from pubsub", ...read from pubsub...)
    .apply(
        "Map PubSubMessages",
        MapElements
            .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(PubSubMessage.class)))
            .via(input -> KV.of(input.getId1() + ":" + input.getId2(), input))
    )
    .apply(...stateful processing...)

PubSubMessage декодируется / кодируется с использованием автоматически сгенерированного класса:

registry.registerCoderForClass(PubSubMessage.class, ProtoCoder.of(PubSubMessage.class));

Проблема, с которой я столкнулся сейчас, заключается в том, что сообщение было обновлено.PubSubMessage теперь имеет два дополнительных поля.Поскольку он proto3, он обратно совместим.Однако, поскольку сгенерированный класс изменился, похоже, что поток данных считает, что тип также изменился, и старые сообщения / шаги больше не совместимы, и не может обновить задание следующим образом:

Рабочий процесс не выполнен.Причины: новое задание несовместимо с существующим заданием.Исходное задание не было прервано. Кодер или тип шага Map PubSubMessages для определения состояния объекта / Map.out0 / FromValue изменился.

Было бы несколько сложно остановитьработа (истощая свои сообщения) и перезапуская его заново.Есть ли лучший способ обновить тип / protobuf / кодер в этом случае?

Спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...