Операция сопоставления через KStream завершается неудачно, если не указать стандартные значения по умолчанию и использовать пользовательские -> org.apache.kafka.streams.errors.StreamsException - PullRequest
0 голосов
/ 26 августа 2018

Поскольку я работаю со значениями Json, я не установил значения по умолчанию.

Я обрабатываю KStream, потребляя его с необходимыми пружинами и сортами продукта (json), но следующий шаг (операция отображения) завершается неудачно:

val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaBootstrapServers

val productSerde: Serde<Product> = Serdes.serdeFrom(JsonPojoSerializer<Product>(), JsonPojoDeserializer(Product::class.java))

builder.stream(INVENTORY_TOPIC, Consumed.with(Serdes.String(), productSerde))
            .map { key, value ->
                KeyValue(key, XXX)
            }
            .aggregate(...)

Если я удаляю операцию карты, выполнение идет нормально.

Я не нашел способа указать serdes для карты (), как это сделать?

Ошибка:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.codependent.kafkastreams.inventory.dto.Product). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)

1 Ответ

0 голосов
/ 26 августа 2018

Многочисленные выпуски:

  1. После того, как вы позвоните map(), вы позвоните groupByKey().aggregate().Это вызывает перераспределение данных и, таким образом, после map() данные записываются во внутренний раздел для перераспределения данных.Следовательно, вам необходимо указать также Serde в пределах groupByKey().

  2. Однако, поскольку вы не модифицируете ключ, вы должны вместо этого вызвать mapValues(),чтобы избежать ненужного перераспределения.

  3. Обратите внимание, что вам нужно предоставить Serde s для каждого оператора, который не должен использовать значение по умолчанию Serde из конфигурации.Serde s не передаются вниз по потоку, но перезаписываются на месте оператора.(В настоящее время Kafka 2.1 работает над улучшением этого.)

...