Что происходит с отметкой времени сообщения в потоке, когда оно отображается в другом потоке? - PullRequest
0 голосов
/ 19 февраля 2019

У меня есть приложение, в котором я обрабатываю поток и преобразую его в другой.Вот пример:

public void run(final String... args) {
    final Serde<Event> eventSerde = new EventSerde();

    final Properties props = streamingConfig.getProperties(
        applicationName,
        concurrency,
        Serdes.String(),
        eventSerde
    );

    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimestampExtractor.class);

    final StreamsBuilder builder = new StreamsBuilder();

    KStream<String, Event> eventStream = builder.stream(inputStream);

    final Serde<Device> deviceSerde = new DeviceSerde();

    eventStream
        .map((key, event) -> {
            final Device device = modelMapper.map(event, Device.class);

            return new KeyValue<>(key, device);
        })
        .to("device_topic", Produced.with(Serdes.String(), deviceSerde));

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);

    streams.start();
}

Вот некоторые подробности о приложении:

Spring Boot 1.5.17
Kafka 2.1.0
Kafka Streams 2.1.0
Spring Kafka 1.3.6

Хотя в сообщениях внутри входного потока установлена ​​временная метка, я также помещаю реализациюTimestampExtractor, чтобы убедиться, что правильная временная метка прикреплена ко всем сообщениям (поскольку другие производители могут отправлять сообщения в одну и ту же тему).

Внутри кода я получаю поток событий и в основном конвертирую их в разные объекты.и в конечном итоге направить эти объекты в разные потоки.

Я пытаюсь понять, прикреплена ли установленная мной начальная временная метка к сообщениям, опубликованным в device_topic, в данном конкретном случае.

Принимающая сторона(потока устройства) выглядит следующим образом:

@KafkaListener(topics = "device_topic")
public void onDeviceReceive(final Device device, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final long timestamp) {
    log.trace("[{}] Received device: {}", timestamp, device);
}

К сожалению, напечатанная метка времени, похоже, является временем настенных часов.Это ожидаемое поведение или я что-то упустил?

1 Ответ

0 голосов
/ 19 февраля 2019

Spring Kafka 1.3.x использует очень старый клиент 0.11;возможно это не распространяет метку времени.Я только что протестировал с Boot 2.1.3 и Spring Kafka 2.2.4, и отметка времени распространяется нормально ...

@SpringBootApplication
@EnableKafkaStreams
public class So54771130Application {

    public static void main(String[] args) {
        SpringApplication.run(So54771130Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so54771130", 0, 42L, null, "baz");
        };
    }

    @Bean
    public KStream<String, String> stream(StreamsBuilder builder) {
        KStream<String, String> stream = builder.stream("so54771130");
        stream
            .map((k, v) -> {
                System.out.println("Mapping:"  + v);
                return new KeyValue<>(null, "bar");
            })
            .to("so54771130-1");
        return stream;
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("so54771130", 1, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("so54771130-1", 1, (short) 1);
    }

    @KafkaListener(id = "so54771130", topics = "so54771130-1")
    public void listen(String in, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
        System.out.println(in + "@" + ts);
    }

}

и

Mapping:baz
bar@42
...