Я хочу создать таблицу KTable со связанным хранилищем состояний, которую можно запрашивать с помощью интерактивных запросов, например:
val builder = StreamsBuilder()
builder.table(CUSTOMERS_TOPIC, Materialized.`as`<String, Customer, KeyValueStore<Bytes, ByteArray>>(CUSTOMERS_STORE))
Однако, чтобы сериализовать мой класс значений Customer, мне нужно указать сериализатор Json. Я мог бы сделать это, используя этот метод из StreamsBuilder:
public synchronized <K, V> KTable<K, V> table(final String topic,
final Consumed<K, V> consumed) {
с этим потреблено:
Consumed.with(Serdes.String(), Serdes.serdeFrom(JsonPojoSerializer<Customer>(), JsonPojoDeserializer(Customer::class.java)
Но, как вы видите, нет способа установить название магазина. Более того, согласно javadoc, внутреннее хранилище состояний может не запрашиваться:
с внутренним названием магазина. Обратите внимание, что название магазина может не запрашиваться
через интерактивные запросы
Так как я могу настроить материализацию с указанным именем и указанием необходимого сериализатора Json?