Как избежать временных меток, извлеченных из темы журнала изменений, чтобы вызвать Processor :: punctuate - PullRequest
0 голосов
/ 14 января 2019

Я пишу приложение KafkaStreams (v2.1.0), которое читает сообщения из обычной темы Kafka, объединяет эти сообщения слева с метаданными из сжатой темы журнала изменений и выполняет некоторую обработку с сохранением состояния с помощью Processor, который также запускается точечный вызов на регулярных временных интервалах, основанных на времени события. Сообщения в теме данных имеют временную метку, и определяется пользовательский TimestampExtractor (который определяет время события, которое я хотел бы использовать в вызове puctuate). Сообщения в теме метаданных, однако, не имеют отметки времени, но, похоже, KafkaStreams требует, чтобы TimestampExtractor все равно определялся. Теперь, если я использую встроенные метаданные в сообщениях Kafka ExtractRecordMetadataTimestamp или просто WallclockTimestampExtractor, это нарушает логику моего приложения, потому что кажется, что эти метки времени в теме метаданных также вызывают прерывистый вызов в моем процессоре, но событие - время в моей теме данных может отставать от часов настенного времени, и я хочу, чтобы на них был включен только пунктуальный вызов.

Мой вопрос заключается в том, как избежать прерывистого вызова по временным меткам, извлеченным из этой темы сжатых метаданных?

Одна хитрость, которая, кажется, работает на первый взгляд, это всегда возвращать 0 в качестве метки времени этих сообщений метаданных, но я не уверен, что это не будет иметь нежелательных побочных эффектов. Другой обходной путь - не полностью полагаться на «пунктуацию» и реализовывать собственное отслеживание времени события, но я бы предпочел использовать стандартный подход KafkaStreams. Так, может быть, есть другой способ решить эту проблему?

Это структура моего приложения:

    KStream<String, Data> input =
        streamsBuilder.stream(dataTopic,
                              Consumed.with(Serdes.String(),
                                            new DataSerde(),
                                            new DataTimestampExtractor(),
                                            Topology.AutoOffsetReset.LATEST));

    KTable<String, Metadata> metadataTable =
        streamsBuilder.table(metadataTopic,
                             Consumed.with(Serdes.String(),
                                           new MetadataSerde(),
                                           new WhichTimestampExtractorToUse(),
                                           Topology.AutoOffsetReset.EARLIEST));

    input.leftJoin(metadataTable, this::joiner)
         .process(new ProcessorUsingPunctuateOnEventTime());
...