Я работаю над приложением потоков Kafka, и у меня возникли проблемы с выяснением, как заставить работать агрегацию.
У меня есть KStream bankTransactions
, где ключи имеют тип String
и значениятипа JsonNode
, поэтому я настроил Serdes моего приложения с помощью
// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());
. Я хочу объединить значения в KTable<String, Long>
, где ключи будут одинаковыми, но значения будут Long
s извлечены измой Json.
Итак, сначала я написал:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);
И я получаю следующую ошибку во время выполнения:
Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
Я понимаю, что Кафка жалуется, потому что я 'Я пытаюсь использовать стандартные Json serdes для сериализации Long
.Таким образом, читая из документа confluent Я попробовал это
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);
Но затем я получаю ошибку при компиляции:
Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>
Я пытался по-другому написать этот код (например, используя Serdes.long()
вместо моего longSerdes
, пытаясь параметризовать типы Materialize
и даже пытаясь записать мой инициализатор и агрегатор как функцию, стиль Java 7), но я не могу понять, что я делаю неправильно.
Итак, мой вопрос прост: как правильно указать Serdes, которые aggregate
должны использовать, если они не являются значениями по умолчанию Serdes?