На KGroupedStream происходит сбой экстрактора метки времени Kafka Stream - PullRequest
0 голосов
/ 08 июня 2019

Я строю микросервис весеннего облака, который использует данные из темы кафки.Что касается потребителя, я связываю эту тему с KStream.Входящие сообщения не содержат отметки времени, так как версия kafka ниже 0.10.Когда я анализирую входящие значения, он работает нормально.В противном случае, когда я сгруппировал их по ключу, он не использует «default.timestamp.extractor» (было установлено значение org.apache.kafka.streams.processor.WallclockTimestampExtractor).

Этот сервиспротестируйте его, используя другую версию kafka (выше или равно 0.10), и она работала нормально.

Вот мой конфиг:

spring: cloud: stream: kafka: streams:binder: brokers: $ {KAFKA_BROKERS} applicationId: конфигурация потока сообщений электронной почты: default.key.serde: org.apache.kafka.common.serialization.Serdes $ StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes $ StringSerde commit.interval.ms: 1000 default.timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor poll.ms: 60000 # ВРЕМЯ БЛОКИРОВАНИЯ, ОЖИДАЯ БОЛЬШЕ СООБЩЕНИЙ buffered.records.per.partition: 2000

SomePart моего кода:

    stream
        .mapValues(this::mapMessage)
        .groupBy(this::buildGroup, Serialized.with(new JsonSerde<>(Group.class), new JsonSerde<>(EmailMessage.class)))
        .windowedBy(TimeWindows.of(WINDOW_TIME))
        .aggregate(ArrayList::new, this::aggregate, Materialized.with(new JsonSerde<>(Group.class), new MessageListSerialization()))
        .toStream()
        .process(() -> new MailMessagesProcessor(emailService));

Это выдает мне эту ошибку: org.apache.kafka.streams.errors.StreamsException: Входная запись ConsumerRecord (topic = .....) Используйте другой TimestampExtractor для обработки этих данных.

1 Ответ

0 голосов
/ 09 июня 2019

Kafka Streams требует брокеров 0.10.0 или новее.Он не совместим со старыми брокерами.

  • Kafka Streams 0.10.0, совместим только с брокерами 0.10.0 (или новее).

  • Kafka Streams 0.10.1 и новее, обратно совместим с 0.10.1 (но не с более старыми брокерами) и совместим с более новыми брокерами.

  • Кроме того, начиная с Kafka Streams 1.0, формат сообщения 0.10 (или выше) требуется.Следовательно, даже если вы обновите своих брокеров до 0.10.0 (или выше), если ваш формат сообщения также не обновлен, он также не будет работать.

  • Для использования "точно"-once ", требуется версия брокера 0.11.0 (или выше).

Подробнее см. https://docs.confluent.io/current/streams/upgrade-guide.html#compatibility

...