Во-первых, эти показания приходят с какого-то устройства? Если да, у вас есть уникальный идентификатор (UUID) для них? Я хотел бы добавить это в ваш поток, так что он хотел бы, как UUID, GpsDateTime, lat, lon
.
Вам нужно будет создать довольно простое приложение Kafka Streams. В этом приложении вы будете хранить самые последние чтения из вашего потока в StoreBuilder. Затем, когда новое сообщение получено от Kafka, вы получите это последнее значение, выполните вычисления и сохраните новые значения lat, long в StoreBuilder.
Конечно, мне непонятно, хотите ли вы, чтобы когда-либо имели одно значение lat, long и все последующие значения вычисляются из 1-го чтения. Или, если вы хотите иметь скользящие вычисления, где вы всегда сравниваете расстояние между последним и текущим показаниями.
В любом случае, вы можете увидеть этот код на практике по адресу: https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java
Этот пример является примером подсчета слов, но может быть быстро преобразован для вашего варианта использования.
Статический конечный класс WordCountTransformerSupplier (строка 78) станет вашим LatLongDistanceComputation.
Вы должны создать StoreBuilder (строка 154) с правильными типами (независимо от того, как вы храните свой широту / долготу).
Строка 165 - это то место, где элемент фактически читается из потока значений.
И, конечно, вам нужно отредактировать inputTopic и outputTopic (строка 66-67) среди нескольких других вещей.