Я пытался посчитать событие из KStream в период времени:
KStream<String, VehicleEventTO> stream = builder.stream("vehicle", Consumed.with(Serdes.String(), new JsonSerde<>(VehicleEventTO.class)));
KStream<String, VehicleEventTO> streamWithKey = stream.selectKey((key, value) -> value.getId_vehicle().toString());
KStream<String, Long> streamValueKey = streamWithKey.map((key, value) -> KeyValue.pair(key, value.getId_vehicle()));
streamValueKey.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
.count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));
У меня есть это исключение:
Исключение в потоке "test-app-87ce164d-c427-4dcf-aa76-aeeb6f8fc943-StreamThread-1 "org.apache.kafka.streams.errors.StreamsException: исключение, обнаруженное в процессе.taskId = 0_0, процессор = KSTREAM-SOURCE-0000000000, тема = транспортное средство, раздел = 0, смещение = 160385 в org.apache.kafka.streams.processor.internals.StreamTask.process (StreamTask.java:318) в org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process (AssignedStreamsTasks.java:94) в org.apache.kafka.streams.processor.internals.TaskManager.process (TaskManager.java:409) в org.apache.kaf.processor.internals.StreamThread.processAndMaybeCommit (StreamThread.java:964) в org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:832) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:767) в org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:736) Вызывается: org.apache.kafka.streams.errors.StreamsException: AСериализатор (ключ: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) не совместим с фактическим ключом или vтип alue (тип ключа: java.lang.String / тип значения: java.lang.Long).Измените Serdes по умолчанию в StreamConfig или предоставьте правильные Serdes через параметры метода.