Мне действительно нужна помощь!Я не могу извлечь временную метку для сообщения, отправленного производителем .В моем проекте, который я работаю с Json, у меня есть класс, в котором я определяю ключи, и класс, в котором я определяю значения сообщения, которое я отправлю через продюсера по теме «Raw».У меня есть 2 других класса, которые делают то же самое для выходного сообщения, которое мой потребитель прочитает по теме «Tdt».В основном классе KafkaStreams.java я определяю поток и сопоставляю ключи и значения.Запустив Kafka локально, я запускаю производителя, который пишет сообщение в «сырой» теме с ключами и значениями, а затем в другой оболочке потребитель начинает читать сообщение о выходе из темы «tdt».Как получить метку времени события?Мне нужно знать временную метку, в которой сообщение было отправлено производителем.Нужен ли мне TimestampExtractor?Вот мой основной класс kafkastreams (мое приложение отлично работает, мне просто нужна временная метка)
@Bean("app1StreamTopology")
public KStream<LibAssIbanRawKey, LibAssIbanRawValue> kStream() throws ParseException {
JsonSerde<Dwsitspr4JoinValue> Dwsitspr4JoinValueSerde = new JsonSerde<>(Dwsitspr4JoinValue.class);
KStream<LibAssIbanRawKey, LibAssIbanRawValue> stream = defaultKafkaStreamsBuilder.stream(inputTopic);
stream.peek((k,v) -> logger.info("Debug3 Chiave descrizione -> ({})",v.getCATRAPP()));
GlobalKTable<Integer, Dwsitspr4JoinValue> categoriaRapporto = defaultKafkaStreamsBuilder
.globalTable(temptiptopicname,
Consumed.with(Serdes.Integer(), Dwsitspr4JoinValueSerde)
// .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
);
logger.info("Debug3 Chiave descrizione -> ({})",categoriaRapporto.toString()) ;
stream.peek((k,v) -> logger.info("Debug4 Chiave descrizione -> ({})",v.getCATRAPP()) );
stream
.join(categoriaRapporto, (k, v) -> v.getCATRAPP(), (valueStream, valueGlobalKtable) -> {
// Value mapping
LibAssIbanTdtValue newValue = new LibAssIbanTdtValue();
newValue.setDescrizioneRidottaCodiceCategoriaDelRapporto(valueGlobalKtable.getDescrizioneRidotta());
newValue.setDescrizioneEstesaCodiceCategoriaDelRapporto(valueGlobalKtable.getDescrizioneEstesa());
newValue.setIdentificativo(valueStream.getAUD_CCID());
.
.
.//Other Value Mapped
.
.
.map((key, value) -> {
// Key mapping
LibAssIbanTdtKey newKey = new LibAssIbanTdtKey();
newKey.setData(dtf.format(localDate));
newKey.setIdentificatoreUnivocoDellaRigaDiTabella(key.getTABROWID());
return KeyValue.pair(newKey, value);
}).to(outputTopic, Produced.with(new JsonSerde<>(LibAssIbanTdtKey.class), new JsonSerde<>(LibAssIbanTdtValue.class)));
return stream;
}
}