Как создать таблицу KT, указав сериализатор json и имя магазина с его материализованным определением - PullRequest
0 голосов
/ 24 августа 2018

Я хочу создать таблицу KTable со связанным хранилищем состояний, которую можно запрашивать с помощью интерактивных запросов, например:

val builder = StreamsBuilder()
        builder.table(CUSTOMERS_TOPIC, Materialized.`as`<String, Customer, KeyValueStore<Bytes, ByteArray>>(CUSTOMERS_STORE))

Однако, чтобы сериализовать мой класс значений Customer, мне нужно указать сериализатор Json. Я мог бы сделать это, используя этот метод из StreamsBuilder:

public synchronized <K, V> KTable<K, V> table(final String topic,
                                                  final Consumed<K, V> consumed) {

с этим потреблено:

Consumed.with(Serdes.String(), Serdes.serdeFrom(JsonPojoSerializer<Customer>(), JsonPojoDeserializer(Customer::class.java)

Но, как вы видите, нет способа установить название магазина. Более того, согласно javadoc, внутреннее хранилище состояний может не запрашиваться:

с внутренним названием магазина. Обратите внимание, что название магазина может не запрашиваться через интерактивные запросы

Так как я могу настроить материализацию с указанным именем и указанием необходимого сериализатора Json?

1 Ответ

0 голосов
/ 24 августа 2018

В Scala API Serdes разрешаются через импликации.Поэтому в параметре Consumed отсутствует перегрузка.Срhttps://github.com/apache/kafka/blob/trunk/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala#L88-L129

Для Java существует перегрузка метода, которая позволяет передавать оба параметра:

public synchronized <K, V> KTable<K, V> table(final String topic,
                                              final Consumed<K, V> consumed,
                                              final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {

Cf.https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized-

...