KSQL - вычислить расстояние от 2 сообщений, используя GEO_DISTANCE - PullRequest
0 голосов
/ 11 сентября 2018

У меня есть тема кафки, и каждое сообщение в теме имеет широту / долготу и метку времени события. Создал поток, ссылающийся на тему, и хотел бы рассчитать расстояние между двумя точками, используя geo_distance. Пример

GpsDateTime            lat              lon
2016-11-30 22:38:36,    32.685757,  -96.735942
2016-11-30 22:39:07,    32.687347,  -96.732841
2016-11-30 22:39:37,    32.68805,   -96.729726 

Я хотел бы создать новый поток в вышеупомянутом потоке и обогатить его расстоянием.

GpsDateTime            lat              lon          Distance
2016-11-30 22:38:36,    32.685757,  -96.735942        0
2016-11-30 22:39:07,    32.687347,  -96.732841        0.340
2016-11-30 22:39:37,    32.68805,   -96.729726        0.302

Можно ли достичь желаемых результатов с помощью KSQL? Или как сослаться на предыдущее сообщение при обработке нового сообщения?

1 Ответ

0 голосов
/ 14 сентября 2018

Во-первых, эти показания приходят с какого-то устройства? Если да, у вас есть уникальный идентификатор (UUID) для них? Я хотел бы добавить это в ваш поток, так что он хотел бы, как UUID, GpsDateTime, lat, lon.

Вам нужно будет создать довольно простое приложение Kafka Streams. В этом приложении вы будете хранить самые последние чтения из вашего потока в StoreBuilder. Затем, когда новое сообщение получено от Kafka, вы получите это последнее значение, выполните вычисления и сохраните новые значения lat, long в StoreBuilder.

Конечно, мне непонятно, хотите ли вы, чтобы когда-либо имели одно значение lat, long и все последующие значения вычисляются из 1-го чтения. Или, если вы хотите иметь скользящие вычисления, где вы всегда сравниваете расстояние между последним и текущим показаниями.

В любом случае, вы можете увидеть этот код на практике по адресу: https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java

Этот пример является примером подсчета слов, но может быть быстро преобразован для вашего варианта использования.

Статический конечный класс WordCountTransformerSupplier (строка 78) станет вашим LatLongDistanceComputation.

Вы должны создать StoreBuilder (строка 154) с правильными типами (независимо от того, как вы храните свой широту / долготу).

Строка 165 - это то место, где элемент фактически читается из потока значений.

И, конечно, вам нужно отредактировать inputTopic и outputTopic (строка 66-67) среди нескольких других вещей.

...