У меня есть одно приложение Kafka Streams (V 2.1.1), которое создает записи и помещает их в тему вывода в формате значения ключа.
ключ - это оконная временная шкала, где я ожидаю ключ и указатель времени начала / окончания окна.
Пример -
.to(kafkaOutPutTopic, Produced.with(windowedSerde, jsonSerde));
Образец - [KEY @ 1551807076000/1551807077000]
где KEY - ключ, время начала - 1551807076000 и время окончания - 1551807077000
Где WindowedSerde
StringSerializer stringSerializer = new StringSerializer();
final TimeWindowedSerializer<String> windowedSerializer = new TimeWindowedSerializer(stringSerializer);
final TimeWindowedDeserializer<String> windowedDeSerializer = new TimeWindowedDeserializer();
final Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeSerializer);
Существует еще один компонент, называемый kafka consumer, который пытается получить сообщение из темы и получить ключ и время начала / окончания окна с помощью DeSerializering с помощью пользовательского класса.
Кафка недвижимости:
kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TimeWindowedDeserializer.class.getName());
Я использую TimeWindowedDeserializer.java из прикрепленной ссылки - https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a
, но позволяют получить время окончания окна, и потребитель не может его использовать из-за десериализации.