StateStore никогда не добавляется в облако Spring - PullRequest
0 голосов
/ 17 марта 2020

Любая справка, как добавить хранилище состояний в облачное хранилище Spring

Я всегда получаю эту ошибку "Вложенное исключение - org.springframework.kafka.KafkaException: Не удалось запустить поток:; Вложенное исключение - org. apache .kafka.streams.errors.TopologyException: неверная топология: myStore StateStore еще не добавлен. "

Вот определение компонента, однако оно никогда не работает

@Bean
  public StoreBuilder storeBuilder() {
    KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("mystore");
    StoreBuilder<KeyValueStore<String, MyData>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), StreamsSerde.MyDataSerde());
    return storeBuilder;
  }

Вот Serde

public static final class MyDataSerde extends Serdes.WrapperSerde<MyData> {
    public MyDataSerde() {
      super(new JsonSerializer<>(), new JsonDeserializer<>(MyData.class));
    }
  }

Вот класс данных

public class MyData {
  private String name;
  private String course;
}

Вот зависимости весеннего облака

springBootVersion = "2.2.5.RELEASE"
set('springCloudVersion', "Hoxton.SR3")

implementation group:"org.springframework.cloud", name: "spring-cloud-stream"
    implementation group: "org.springframework.cloud", name: "spring-cloud-stream-binder-kafka-streams"
    implementation group: "org.springframework.cloud", name: "spring-cloud-starter-stream-kafka" 

Ответы [ 2 ]

1 голос
/ 19 марта 2020

Я нашел решение для добавления магазина программно в этой статье

1 голос
/ 17 марта 2020

Вам нужно добавить хранилища состояний, подобные этому, когда вам нужно использовать низкоуровневый API-интерфейс процессора или преобразователя. Вы пытались добавить хранилище состояний к вашему вызову метода process или transform? Вот тест , который работает. Посмотрите на вызов process и то, как передаются государственные магазины.

...