Извлечение временной метки из сообщения производителя - PullRequest
0 голосов
/ 24 сентября 2019

Мне действительно нужна помощь!Я не могу извлечь временную метку для сообщения, отправленного производителем .В моем проекте, который я работаю с Json, у меня есть класс, в котором я определяю ключи, и класс, в котором я определяю значения сообщения, которое я отправлю через продюсера по теме «Raw».У меня есть 2 других класса, которые делают то же самое для выходного сообщения, которое мой потребитель прочитает по теме «Tdt».В основном классе KafkaStreams.java я определяю поток и сопоставляю ключи и значения.Запустив Kafka локально, я запускаю производителя, который пишет сообщение в «сырой» теме с ключами и значениями, а затем в другой оболочке потребитель начинает читать сообщение о выходе из темы «tdt».Как получить метку времени события?Мне нужно знать временную метку, в которой сообщение было отправлено производителем.Нужен ли мне TimestampExtractor?Вот мой основной класс kafkastreams (мое приложение отлично работает, мне просто нужна временная метка)

  @Bean("app1StreamTopology") 
        public KStream<LibAssIbanRawKey, LibAssIbanRawValue> kStream() throws ParseException {
    JsonSerde<Dwsitspr4JoinValue> Dwsitspr4JoinValueSerde = new JsonSerde<>(Dwsitspr4JoinValue.class);

            KStream<LibAssIbanRawKey, LibAssIbanRawValue> stream = defaultKafkaStreamsBuilder.stream(inputTopic);

            stream.peek((k,v) -> logger.info("Debug3 Chiave descrizione -> ({})",v.getCATRAPP()));


            GlobalKTable<Integer, Dwsitspr4JoinValue> categoriaRapporto = defaultKafkaStreamsBuilder
                    .globalTable(temptiptopicname, 
                            Consumed.with(Serdes.Integer(), Dwsitspr4JoinValueSerde)
                            //                      .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
                            );

            logger.info("Debug3 Chiave descrizione -> ({})",categoriaRapporto.toString()) ;
            stream.peek((k,v) -> logger.info("Debug4 Chiave descrizione -> ({})",v.getCATRAPP()) );

            stream
            .join(categoriaRapporto, (k, v) -> v.getCATRAPP(), (valueStream, valueGlobalKtable) -> {

                // Value mapping
                LibAssIbanTdtValue newValue = new LibAssIbanTdtValue();
                newValue.setDescrizioneRidottaCodiceCategoriaDelRapporto(valueGlobalKtable.getDescrizioneRidotta());
                newValue.setDescrizioneEstesaCodiceCategoriaDelRapporto(valueGlobalKtable.getDescrizioneEstesa()); 
                newValue.setIdentificativo(valueStream.getAUD_CCID());
.
.
.//Other Value Mapped
.
.
    .map((key, value) -> {
                // Key mapping
                LibAssIbanTdtKey newKey = new LibAssIbanTdtKey();
                newKey.setData(dtf.format(localDate));
                newKey.setIdentificatoreUnivocoDellaRigaDiTabella(key.getTABROWID());

                return KeyValue.pair(newKey, value);
            }).to(outputTopic, Produced.with(new JsonSerde<>(LibAssIbanTdtKey.class), new JsonSerde<>(LibAssIbanTdtValue.class)));
            return stream;
        }
    }

1 Ответ

0 голосов
/ 25 сентября 2019

Да, вам нужен TimestampExtractor.

public class YourTimestampExtractor implements TimestampExtractor {


    @Override
    public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {

        // do whatever you want with the timestamp available with consumerRecord.timestamp()
        ...

        // return here the timestamp you want to use (here default)
        return consumerRecord.timestamp();
    }
}

Вам нужно указать потоку kafka, какой экстрактор использовать под ключом StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG

...