KafkaStreams Как указать Serdes в потоковой агрегации? - PullRequest
0 голосов
/ 13 ноября 2018

Я работаю над приложением потоков Kafka, и у меня возникли проблемы с выяснением, как заставить работать агрегацию.

У меня есть KStream bankTransactions, где ключи имеют тип String и значениятипа JsonNode, поэтому я настроил Serdes моего приложения с помощью

// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());

. Я хочу объединить значения в KTable<String, Long>, где ключи будут одинаковыми, но значения будут Long s извлечены измой Json.

Итак, сначала я написал:

KTable<String, Long> totalBalances = bankTransactions
        .groupByKey()
        .aggregate(
                () -> 0L,
                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
                Materialized.as("bank-total-balance")
        );

И я получаю следующую ошибку во время выполнения:

Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

Я понимаю, что Кафка жалуется, потому что я 'Я пытаюсь использовать стандартные Json serdes для сериализации Long.Таким образом, читая из документа confluent Я попробовал это

KTable<String, Long> totalBalances = bankTransactions
        .groupByKey()
        .aggregate(
                () -> 0L,
                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
                Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
        );

Но затем я получаю ошибку при компиляции:

Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>

Я пытался по-другому написать этот код (например, используя Serdes.long() вместо моего longSerdes, пытаясь параметризовать типы Materialize и даже пытаясь записать мой инициализатор и агрегатор как функцию, стиль Java 7), но я не могу понять, что я делаю неправильно.

Итак, мой вопрос прост: как правильно указать Serdes, которые aggregate должны использовать, если они не являются значениями по умолчанию Serdes?

1 Ответ

0 голосов
/ 13 ноября 2018

Кажется, правильный синтаксис следующий:

KTable<String, Long> totalBalances = bankTransactions
        .groupByKey()
        .aggregate(
                () -> 0L,
                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
                Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("bank-total-balances")
                        .withKeySerde(stringSerde)
                        .withValueSerde(longSerde)
        );

Три типа после Materialize. - это ключи, значения и хранилище, использованные для материализации KTable, и этот тип не должен меняться. Затем мы можем определить Serdes, используемый для записи в это хранилище значений ключей.

Примечание Я получил этот синтаксис из случайного репо, найденного на github, я все равно с удовольствием принял бы ответ с более точным ответом, подкрепленным некоторой документацией.

...