Kafka Consumer не может десериализовать временный ключ с начальным и конечным временем - PullRequest
0 голосов
/ 09 мая 2019

У меня есть одно приложение Kafka Streams (V 2.1.1), которое создает записи и помещает их в тему вывода в формате значения ключа.

ключ - это оконная временная шкала, где я ожидаю ключ и указатель времени начала / окончания окна.

Пример -

.to(kafkaOutPutTopic, Produced.with(windowedSerde, jsonSerde));

Образец - [KEY @ 1551807076000/1551807077000] где KEY - ключ, время начала - 1551807076000 и время окончания - 1551807077000

Где WindowedSerde

StringSerializer stringSerializer = new StringSerializer();

final TimeWindowedSerializer<String> windowedSerializer = new TimeWindowedSerializer(stringSerializer);

final TimeWindowedDeserializer<String> windowedDeSerializer = new TimeWindowedDeserializer();

final Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeSerializer);

Существует еще один компонент, называемый kafka consumer, который пытается получить сообщение из темы и получить ключ и время начала / окончания окна с помощью DeSerializering с помощью пользовательского класса.

Кафка недвижимости:

kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TimeWindowedDeserializer.class.getName()); 

Я использую TimeWindowedDeserializer.java из прикрепленной ссылки - https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a

, но позволяют получить время окончания окна, и потребитель не может его использовать из-за десериализации.

1 Ответ

0 голосов
/ 12 мая 2019

Кажется, вы бьете https://issues.apache.org/jira/browse/KAFKA-7110

Это исправлено в 2.2.0, что позволяет передавать размер окна в конструктор или TimeWindows.

Обратите внимание, что ни метка времени окончания окна, ни размер окна не сохраняются в данных. Это оптимизация хранения, потому что для TimeWindows размер одинаков для всех окон, и, таким образом, отметка времени окончания может быть вычислена на основе отметки времени начала плюс размер окна.

...