Я строю микросервис весеннего облака, который использует данные из темы кафки.Что касается потребителя, я связываю эту тему с KStream.Входящие сообщения не содержат отметки времени, так как версия kafka ниже 0.10.Когда я анализирую входящие значения, он работает нормально.В противном случае, когда я сгруппировал их по ключу, он не использует «default.timestamp.extractor» (было установлено значение org.apache.kafka.streams.processor.WallclockTimestampExtractor).
Этот сервиспротестируйте его, используя другую версию kafka (выше или равно 0.10), и она работала нормально.
Вот мой конфиг:
spring: cloud: stream: kafka: streams:binder: brokers: $ {KAFKA_BROKERS} applicationId: конфигурация потока сообщений электронной почты: default.key.serde: org.apache.kafka.common.serialization.Serdes $ StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes $ StringSerde commit.interval.ms: 1000 default.timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor poll.ms: 60000 # ВРЕМЯ БЛОКИРОВАНИЯ, ОЖИДАЯ БОЛЬШЕ СООБЩЕНИЙ buffered.records.per.partition: 2000
SomePart моего кода:
stream
.mapValues(this::mapMessage)
.groupBy(this::buildGroup, Serialized.with(new JsonSerde<>(Group.class), new JsonSerde<>(EmailMessage.class)))
.windowedBy(TimeWindows.of(WINDOW_TIME))
.aggregate(ArrayList::new, this::aggregate, Materialized.with(new JsonSerde<>(Group.class), new MessageListSerialization()))
.toStream()
.process(() -> new MailMessagesProcessor(emailService));
Это выдает мне эту ошибку: org.apache.kafka.streams.errors.StreamsException: Входная запись ConsumerRecord (topic = .....) Используйте другой TimestampExtractor для обработки этих данных.