Ошибка сериализации в Serdes при запуске приложения Streams - PullRequest
1 голос
/ 31 мая 2019

Я пишу приложение, которое обрабатывает входящую запись координат и отслеживает их на вход или выход в определенных геозонах.Между тем, я поддерживаю состояние, находится ли текущая координата внутри / снаружи определенной геозоны, используя агрегацию.

Ниже приведена трассировка стека ошибки при выполнении этого:

Exception in thread "geofence-events-990be707-d44d-410d-a8a7-b2a3b90cb84f-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000003, topic=vehicle-sensor-data, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.loconav.GeoFenceDTO). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:73)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
    ... 5 more
Caused by: java.lang.ClassCastException: class com.loconav.GeoFenceDTO cannot be cast to class java.lang.String (com.loconav.GeoFenceDTO is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:157)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 45 more

Я написалвзгляд после flatMapValues ​​и первая запись печатается, а затем происходит это исключение.Ниже приведен код топологии

vehicleGeoFences
  .flatMapValues(
      (readOnlyKey, value) -> {
        List<GeoFenceDTO> geoFenceDTOS = new ArrayList<>();
        // some logic
        return geoFenceDTOS;
      })
  .groupBy((key, value) -> value.getGeofenceId() + "," + value.getVehicleId())
  .aggregate(
      GeoFenceDTO::new,
      (key, value, aggregate) -> {

        //setting values
        return geofenceDTO;
      },
      Materialized.with(Serdes.String(), getGeofenceDTOValueSerde()))
  .toStream()
  .mapValues(
      (readOnlyKey, value) -> {
        // changing object
      })
  .to(
      Topics.VEHICLE_EVENTS.name(),
      Produced.with(vehicleEventsKeySerde, vehicleEventsValueSerde));

. Я написал пользовательский Serde для GeofenceDTO, который использует Джексон ObjectMapper для преобразования объекта в байты и наоборот.Так или иначе, параметры, представленные в Материализованном виде, не выбираются.Значением ключа по умолчанию Serde для приложения является StringSerde.

Помощь приветствуется.ТИА.Я попытался изменить пользовательский объект на сгенерированный Avro, но ошибка не исчезла.

...