Потоки Кафки, не использующие serde после перераспределения - PullRequest
0 голосов
/ 06 декабря 2018

Приложение My Kafka Streams использует тему kafka, которая использует следующую раскладку ключ-значение: String.class -> HistoryEvent.class

При печати моей текущей темы это может быть подтверждено:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-event-stream-file-service-test-instance --property print.key=true --property key.separator=" -- " --from-beginning
flow1 --  SUCCESS     #C:\Daten\file-service\in\crypto.p12

«flow1» - это ключ String, а часть после -- - это сериализованное значение.

Мой поток настроен так:

    KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
            historyEventSerde));


    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey()
            .reduce((e1, e2) -> e2,
                    Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                            .withKeySerde(new HistoryEventKeySerde()));

Итак, насколько язнаете, я говорю, чтобы использовать тему, используя String и HistoryEvent serde, поскольку это то, что в теме.Затем я «перезаписываю» его, чтобы использовать комбинированный ключ, который должен храниться локально, используя предоставленный serde для HistoryEventKey.class.Насколько я понимаю, это приведет к созданию дополнительной темы (можно увидеть в списке тем в контейнере kafka) с новым ключом.Это нормально.

Теперь проблема в том, что приложение не может запуститься даже из чистой среды только с одним документом в теме:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=flow-event-stream-file-service-test-instance, partition=0, offset=0
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: HistoryEventSerializer) is not compatible to the actual key or value type (key type: HistoryEventKey / value type: HistoryEvent). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

Сложно сказатьиз сообщения, где именно проблема.В моей базовой теме написано, что это невозможно, так как там нет ключа типа HistoryEventKey.Так как я предоставил serde для HistoryEventKey в reduce, он также не может быть в местном хранилище.

Единственное, что имеет смысл для меня, это то, что он связан с операцией selectKey, котораявызывает перестановку и новую тему.Однако я не могу понять, как я могу предоставить serde для этой операции.Я не хочу устанавливать его как значение по умолчанию, потому что это не стандартная клавиша serde.

1 Ответ

0 голосов
/ 06 декабря 2018

После некоторой отладки выполнения я смог выяснить, что новая тема создается на шаге groupByKey.Вы можете предоставить экземпляр Grouped, который дает возможность указать Serde, используемый для ключа и значения:

    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
                    .withKeySerde(new HistoryEventKeySerde())
                    .withValueSerde(new HistoryEventSerde())
            )
            .reduce((e1, e2) -> e2,
                    Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                            .withKeySerde(new HistoryEventKeySerde()));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...