У меня есть приложение, в котором я обрабатываю поток и преобразую его в другой.Вот пример:
public void run(final String... args) {
final Serde<Event> eventSerde = new EventSerde();
final Properties props = streamingConfig.getProperties(
applicationName,
concurrency,
Serdes.String(),
eventSerde
);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimestampExtractor.class);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Event> eventStream = builder.stream(inputStream);
final Serde<Device> deviceSerde = new DeviceSerde();
eventStream
.map((key, event) -> {
final Device device = modelMapper.map(event, Device.class);
return new KeyValue<>(key, device);
})
.to("device_topic", Produced.with(Serdes.String(), deviceSerde));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
Вот некоторые подробности о приложении:
Spring Boot 1.5.17
Kafka 2.1.0
Kafka Streams 2.1.0
Spring Kafka 1.3.6
Хотя в сообщениях внутри входного потока установлена временная метка, я также помещаю реализациюTimestampExtractor, чтобы убедиться, что правильная временная метка прикреплена ко всем сообщениям (поскольку другие производители могут отправлять сообщения в одну и ту же тему).
Внутри кода я получаю поток событий и в основном конвертирую их в разные объекты.и в конечном итоге направить эти объекты в разные потоки.
Я пытаюсь понять, прикреплена ли установленная мной начальная временная метка к сообщениям, опубликованным в device_topic, в данном конкретном случае.
Принимающая сторона(потока устройства) выглядит следующим образом:
@KafkaListener(topics = "device_topic")
public void onDeviceReceive(final Device device, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final long timestamp) {
log.trace("[{}] Received device: {}", timestamp, device);
}
К сожалению, напечатанная метка времени, похоже, является временем настенных часов.Это ожидаемое поведение или я что-то упустил?