Я пишу приложение KafkaStreams (v2.1.0), которое читает сообщения из обычной темы Kafka, объединяет эти сообщения слева с метаданными из сжатой темы журнала изменений и выполняет некоторую обработку с сохранением состояния с помощью Processor, который также запускается точечный вызов на регулярных временных интервалах, основанных на времени события. Сообщения в теме данных имеют временную метку, и определяется пользовательский TimestampExtractor (который определяет время события, которое я хотел бы использовать в вызове puctuate). Сообщения в теме метаданных, однако, не имеют отметки времени, но, похоже, KafkaStreams требует, чтобы TimestampExtractor все равно определялся. Теперь, если я использую встроенные метаданные в сообщениях Kafka ExtractRecordMetadataTimestamp
или просто WallclockTimestampExtractor
, это нарушает логику моего приложения, потому что кажется, что эти метки времени в теме метаданных также вызывают прерывистый вызов в моем процессоре, но событие - время в моей теме данных может отставать от часов настенного времени, и я хочу, чтобы на них был включен только пунктуальный вызов.
Мой вопрос заключается в том, как избежать прерывистого вызова по временным меткам, извлеченным из этой темы сжатых метаданных?
Одна хитрость, которая, кажется, работает на первый взгляд, это всегда возвращать 0 в качестве метки времени этих сообщений метаданных, но я не уверен, что это не будет иметь нежелательных побочных эффектов. Другой обходной путь - не полностью полагаться на «пунктуацию» и реализовывать собственное отслеживание времени события, но я бы предпочел использовать стандартный подход KafkaStreams. Так, может быть, есть другой способ решить эту проблему?
Это структура моего приложения:
KStream<String, Data> input =
streamsBuilder.stream(dataTopic,
Consumed.with(Serdes.String(),
new DataSerde(),
new DataTimestampExtractor(),
Topology.AutoOffsetReset.LATEST));
KTable<String, Metadata> metadataTable =
streamsBuilder.table(metadataTopic,
Consumed.with(Serdes.String(),
new MetadataSerde(),
new WhichTimestampExtractorToUse(),
Topology.AutoOffsetReset.EARLIEST));
input.leftJoin(metadataTable, this::joiner)
.process(new ProcessorUsingPunctuateOnEventTime());