Я хочу использовать две темы Kafka с потоками Kafka, поддерживаемыми Spring Kafka. Темы имеют другой ключ и значение. Я хочу сопоставить ключ и значение из вторых топи c и merge
это с первым с помощью метода: .merge(KStream<X,Y> otherStream)
.
Вот пример:
// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
"second-topic",
consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
(key, value) -> {
List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
// Do stuff an fill out the list
return list;
});
// Block 2
KStream<MyKey, MyValue>[] branches = stream
.merge(stream2)
... business stuff
С этим решением я получаю ClassCastException
по той причине, что MyKey
нельзя привести к MyKey
. Причина в том, что они предоставляются разными модулями и загрузчиками классов. Ошибка происходит в сериализации, в блоке слияния. С transform(..)
у меня такое же поведение. Если я добавлю команду .through("tmp-topic")
все работает нормально. Похоже, материализация с помощью topi c возвращает действительный сериализуемый объект вместо flatMap(...)
.
Я обнаружил, что следующий API делает c в groupByKey
:
... Если перед этой операцией использовался оператор смены ключа (например, selectKey (KeyValueMapper), map (KeyValueMapper), flatMap (KeyValueMapper) или transform (TransformerSupplier, String ...)), и перераспределение данных не происходило после этого (например, через via (String)) в Кафке будет создана внутренняя переразметка topi c. Эта topi c будет называться «$ {applicationId} -XXX-repartition», где «applicationId» задается пользователем в StreamsConfig через параметр APPLICATION_ID_CONFIG, «XXX» - это внутреннее имя, а «-repartition» - фиксированное. суффикс. Вы можете получить все сгенерированные внутренние имена topi c через Topology.describe (). В этом случае все данные этого потока будут перераспределены через перераспределение topi c путем записи в него всех записей и перечитывания всех записей из него, так что результирующий KGroupedStream будет правильно разделен на ключ ...
Как видите, похоже, что после смены операций, таких как flatMap(...)
, рекомендуется записывать в топи c из-за сериализации и перераспределения.
Что вы делаете думаете об использовании through("topic")
, чтобы заставить его работать? Кто-нибудь знает, есть ли возможность материализоваться после flatMap(...)
без записи в топи c?
Версии
Версия Spring Kafka: 2.2.5. РЕЛИЗ
Apache Клиент Kafka: 2.0.1
Apache Потоки Кафки: 2.0.1