"WindowedBy Count KStream" выдает исключение StreamsException - PullRequest
1 голос
/ 01 июля 2019

Я пытался посчитать событие из KStream в период времени:

    KStream<String, VehicleEventTO> stream = builder.stream("vehicle", Consumed.with(Serdes.String(), new JsonSerde<>(VehicleEventTO.class)));

    KStream<String, VehicleEventTO> streamWithKey = stream.selectKey((key, value) -> value.getId_vehicle().toString());

    KStream<String, Long> streamValueKey = streamWithKey.map((key, value) -> KeyValue.pair(key, value.getId_vehicle()));

    streamValueKey.groupByKey()
                  .windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
                  .count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));

У меня есть это исключение:

Исключение в потоке "test-app-87ce164d-c427-4dcf-aa76-aeeb6f8fc943-StreamThread-1 "org.apache.kafka.streams.errors.StreamsException: исключение, обнаруженное в процессе.taskId = 0_0, процессор = KSTREAM-SOURCE-0000000000, тема = транспортное средство, раздел = 0, смещение = 160385 в org.apache.kafka.streams.processor.internals.StreamTask.process (StreamTask.java:318) в org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process (AssignedStreamsTasks.java:94) в org.apache.kafka.streams.processor.internals.TaskManager.process (TaskManager.java:409) в org.apache.kaf.processor.internals.StreamThread.processAndMaybeCommit (StreamThread.java:964) в org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:832) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:767) в org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:736) Вызывается: org.apache.kafka.streams.errors.StreamsException: AСериализатор (ключ: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) не совместим с фактическим ключом или vтип alue (тип ключа: java.lang.String / тип значения: java.lang.Long).Измените Serdes по умолчанию в StreamConfig или предоставьте правильные Serdes через параметры метода.

1 Ответ

3 голосов
/ 01 июля 2019

groupByKey() использует сериализаторы по умолчанию:

groupByKey()

Группирует записи по их текущему ключу в KGroupedStream при сохраненииисходные значения и сериализаторы и десериализаторы по умолчанию.

Вы должны использовать groupByKey(Serialized<K,V> serialized) или groupByKey(Grouped<K,V> grouped).

Следующее должно сделать трюк:

streamValueKey.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
              .windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
              .count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));
...