Исключение LongDeserializer при чтении KTable с использованием spring-kakfa-stream - PullRequest
0 голосов
/ 03 мая 2018

Я пытаюсь прочитать KTable, используя проект spring-cloud-stream-binder-kafka-streams. Можем ли мы прочитать KTable, используя spring @StreamListener и все интерфейсы, которые предоставляет spring-cloud-streams для обмена сообщениями.

Я получаю исключение LongDeserializer при чтении KTable.

Я использую springCloudVersion = 'Finchley.RC1' springBootVersion = '2.0.1.RELEASE'

Ссылка github на проект доступна по адресу, https://github.com/jaysara/KStreamAnalytics

Вот трассировка стека,

Exception in thread "panalytics-ac0fa75f-2ae4-4b26-9a04-1f80d1479112-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:549)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

1 Ответ

0 голосов
/ 03 мая 2018

Вам необходимо раскомментировать эту строку: https://github.com/jaysara/KStreamAnalytics/blob/master/src/main/resources/application.properties#L19

spring.cloud.stream.bindings.policyPaidAnalytic.producer.useNativeEncoding=true

По умолчанию связыватель пытается сериализоваться на исходящем и использовать application/json в качестве типа контента. Итак, в вашем случае, он выходил как json (String), и именно поэтому вы получили исключение Long для сериализации. Установив для указанного выше флага значение true, вы просите связыватель остаться и позволить потокам Kafka изначально сериализоваться с помощью LongSerde.

При повторном запуске вы можете очистить тему policyAnalytic или использовать новую тему.

Надеюсь, это поможет.

...