Материализация кафки ручья после плоской карты - PullRequest
1 голос
/ 20 апреля 2020

Я хочу использовать две темы Kafka с потоками Kafka, поддерживаемыми Spring Kafka. Темы имеют другой ключ и значение. Я хочу сопоставить ключ и значение из вторых топи c и merge это с первым с помощью метода: .merge(KStream<X,Y> otherStream).

Вот пример:

    // Block 1
    KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
        "second-topic",
        consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
    ).flatMap(
        (key, value) -> {
            List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
            // Do stuff an fill out the list
            return list;
        });

    // Block 2
    KStream<MyKey, MyValue>[] branches = stream
        .merge(stream2)
        ... business stuff

С этим решением я получаю ClassCastException по той причине, что MyKey нельзя привести к MyKey. Причина в том, что они предоставляются разными модулями и загрузчиками классов. Ошибка происходит в сериализации, в блоке слияния. С transform(..) у меня такое же поведение. Если я добавлю команду .through("tmp-topic") все работает нормально. Похоже, материализация с помощью topi c возвращает действительный сериализуемый объект вместо flatMap(...).

Я обнаружил, что следующий API делает c в groupByKey:

... Если перед этой операцией использовался оператор смены ключа (например, selectKey (KeyValueMapper), map (KeyValueMapper), flatMap (KeyValueMapper) или transform (TransformerSupplier, String ...)), и перераспределение данных не происходило после этого (например, через via (String)) в Кафке будет создана внутренняя переразметка topi c. Эта topi c будет называться «$ {applicationId} -XXX-repartition», где «applicationId» задается пользователем в StreamsConfig через параметр APPLICATION_ID_CONFIG, «XXX» - это внутреннее имя, а «-repartition» - фиксированное. суффикс. Вы можете получить все сгенерированные внутренние имена topi c через Topology.describe (). В этом случае все данные этого потока будут перераспределены через перераспределение topi c путем записи в него всех записей и перечитывания всех записей из него, так что результирующий KGroupedStream будет правильно разделен на ключ ...

Как видите, похоже, что после смены операций, таких как flatMap(...), рекомендуется записывать в топи c из-за сериализации и перераспределения.

Что вы делаете думаете об использовании through("topic"), чтобы заставить его работать? Кто-нибудь знает, есть ли возможность материализоваться после flatMap(...) без записи в топи c?

Версии
Версия Spring Kafka: 2.2.5. РЕЛИЗ
Apache Клиент Kafka: 2.0.1
Apache Потоки Кафки: 2.0.1

1 Ответ

2 голосов
/ 20 апреля 2020

Просто для некоторого контекста, всякий раз, когда вы используете операцию смены ключа, любые процессоры ниже по потоку, использующие новый ключ, инициируют создание раздела перераспределения c. Перераспределение topi c гарантирует, что новый ключ находится в правильном разделе. Я знаю, что вы, наверное, уже это знаете, но я просто повторяю эту мысль для ясности.

Имея это в виду, вполне приемлемо выполнить операцию through() после того, как вы изменили ключ, потому что это то, что Kafka Streams все равно подойдет под одеяло.

Так что flatMap(...).through(someTopic) работает нормально.

Кроме того, таким образом вы также исключаете возможность нескольких перераспределений, если повторно используете экземпляр KStream с измененным ключом в других операциях (соединениях, агрегациях) в нисходящем направлении.

HTH,

Билл

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...