Я реализую UDF K SQL, который будет определять, находится ли данная пара (широта, долгота) в геозоне (много лат-лонных пар, образующих многоугольник) или нет. Я черпал вдохновение из этого проекта (https://github.com/gschmutz/various-demos/tree/master/kafka-geofencing) и написал свой собственный класс JAVA (см. Ниже) с ограниченными функциональными возможностями, которые мне необходимы.
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
name = "geofence",
description = "Determines if a lat/long is inside or outside the geometry passed as the third parameter as a WKT coded string",
version = "0.1.0",
author = "xyz"
public class GeoFence {
private static GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
private static WKTReader wktReader = new WKTReader(geometryFactory);
* Determines if a lat/long is inside or outside the geometry passed as the third parameter as a WKT coded string.
* Returns either INSIDE or OUSIDE.
* @param latitude
* @param longitude
* @param geometryWKT
* @return
@Udf(description = "Determines if a lat/long is inside or outside the geometry passed as the third parameter as a WKT coded string")
public static boolean geofence(final double latitude, final double longitude, String geometryWKT) {
boolean status = false;
Polygon polygon = null;
try {
polygon = (Polygon) wktReader.read(geometryWKT);
// However, an important point to note is that the longitude is the X value
// and the latitude the Y value. So we say "lat/long",
// but JTS will expect it in the order "long/lat".
Coordinate coord = new Coordinate(longitude, latitude);
Point point = geometryFactory.createPoint(coord);
status = point.within(polygon);
} catch (ParseException e) {
throw new RuntimeException(e.getMessage());
return status;
и ниже - мой POM. xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
Geo-Processing Utilities
<name>Open Source Geospatial Foundation Repository</name>
<name>Boundless Maven Repository</name>
<!-- Other properties such as kafka.version are derived from parent project(s) such as
https://github.com/confluentinc/common (see common's pom.xml for kafka.version).
<!-- JUnit 5 requires Surefire version 2.22.1 or higher -->
<!--TODO: enable this once we have warnings under control<arg>-Werror</arg>-->
Когда я создаю проект Maven, успешно создается банка. Когда я пытаюсь перезапустить Confluent K SQL -Server с помощью команды "confluent local start k sql -server" K-1019 * -Server не запускается. Когда я проверяю журналы K SQL -Server с помощью команды "confluent log local k sql -server", я вижу следующее сообщение:
io.github.lukehutch.fastclasspathscanner.MatchProcessorException: java.lang.NoClassDefFoundError: org/locationtech/jts/io/ParseException
at io.github.lukehutch.fastclasspathscanner.MatchProcessorException.newInstance(MatchProcessorException.java:81)
at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:757)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1606)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1678)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1704)
at io.confluent.ksql.function.UserFunctionLoader.loadFunctions(UserFunctionLoader.java:135)
at io.confluent.ksql.function.UserFunctionLoader.lambda$load$2(UserFunctionLoader.java:97)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
Я написал еще один простой UDF, который работает и работает отлично в К SQL -Сервер. Кто-нибудь может помочь выяснить, в чем проблема с этим UDF?