Я реализую UDF K SQL, который будет определять, находится ли данная пара (широта, долгота) в геозоне (много лат-лонных пар, образующих многоугольник) или нет. Я черпал вдохновение из этого проекта (https://github.com/gschmutz/various-demos/tree/master/kafka-geofencing) и написал свой собственный класс JAVA (см. Ниже) с ограниченными функциональными возможностями, которые мне необходимы.
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(
name = "geofence",
description = "Determines if a lat/long is inside or outside the geometry passed as the third parameter as a WKT coded string",
version = "0.1.0",
author = "xyz"
)
public class GeoFence {
private static GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
private static WKTReader wktReader = new WKTReader(geometryFactory);
/**
* Determines if a lat/long is inside or outside the geometry passed as the third parameter as a WKT coded string.
* Returns either INSIDE or OUSIDE.
* @param latitude
* @param longitude
* @param geometryWKT
* @return
*/
@Udf(description = "Determines if a lat/long is inside or outside the geometry passed as the third parameter as a WKT coded string")
public static boolean geofence(final double latitude, final double longitude, String geometryWKT) {
boolean status = false;
Polygon polygon = null;
try {
polygon = (Polygon) wktReader.read(geometryWKT);
// However, an important point to note is that the longitude is the X value
// and the latitude the Y value. So we say "lat/long",
// but JTS will expect it in the order "long/lat".
Coordinate coord = new Coordinate(longitude, latitude);
Point point = geometryFactory.createPoint(coord);
status = point.within(polygon);
} catch (ParseException e) {
throw new RuntimeException(e.getMessage());
}
return status;
}
}
и ниже - мой POM. xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.trivadis.sample.geofencing</groupId>
<artifactId>geo-utils</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<description>
Geo-Processing Utilities
</description>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
<repository>
<id>osgeo</id>
<name>Open Source Geospatial Foundation Repository</name>
<url>http://download.osgeo.org/webdav/geotools/</url>
</repository>
<repository>
<snapshots>
<enabled>true</enabled>
</snapshots>
<id>boundless</id>
<name>Boundless Maven Repository</name>
<url>http://repo.boundlessgeo.com/main</url>
</repository>
<repository>
<id>dev-azure-com-se-innovationprojects-barge-tracking</id>
<url>https://pkgs.dev.azure.com/SE-InnovationProjects/_packaging/barge-tracking/maven/v1</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</pluginRepository>
</pluginRepositories>
<distributionManagement>
<repository>
<id>dev-azure-com-se-innovationprojects-barge-tracking</id>
<url>https://pkgs.dev.azure.com/SE-InnovationProjects/_packaging/barge-tracking/maven/v1</url>
</repository>
</distributionManagement>
<!-- Other properties such as kafka.version are derived from parent project(s) such as
https://github.com/confluentinc/common (see common's pom.xml for kafka.version).
-->
<properties>
<guava.version>24.1.1-jre</guava.version>
<geotools.version>23-SNAPSHOT</geotools.version>
<geohash.version>1.3.0</geohash.version>
<ksql.version>5.4.0</ksql.version>
<docker.skip-build>false</docker.skip-build>
<docker.skip-test>false</docker.skip-test>
<java.version>1.8</java.version>
<maven.shade.version>3.2.1</maven.shade.version>
<!-- JUnit 5 requires Surefire version 2.22.1 or higher -->
<maven.surefire.version>2.22.1</maven.surefire.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>geo</artifactId>
<version>0.7.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-shapefile</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-main</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-epsg-hsql</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-udf</artifactId>
<version>${ksql.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<inherited>true</inherited>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<compilerArgs>
<arg>-Xlint:all</arg>
<!--TODO: enable this once we have warnings under control<arg>-Werror</arg>-->
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven.shade.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven.surefire.version}</version>
</plugin>
</plugins>
</build>
</project>
Когда я создаю проект Maven, успешно создается банка. Когда я пытаюсь перезапустить Confluent K SQL -Server с помощью команды "confluent local start k sql -server" K-1019 * -Server не запускается. Когда я проверяю журналы K SQL -Server с помощью команды "confluent log local k sql -server", я вижу следующее сообщение:
(io.confluent.ksql.rest.server.KsqlServerMain:61)
io.github.lukehutch.fastclasspathscanner.MatchProcessorException: java.lang.NoClassDefFoundError: org/locationtech/jts/io/ParseException
at io.github.lukehutch.fastclasspathscanner.MatchProcessorException.newInstance(MatchProcessorException.java:81)
at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:757)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1606)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1678)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1704)
at io.confluent.ksql.function.UserFunctionLoader.loadFunctions(UserFunctionLoader.java:135)
at io.confluent.ksql.function.UserFunctionLoader.lambda$load$2(UserFunctionLoader.java:97)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
Я написал еще один простой UDF, который работает и работает отлично в К SQL -Сервер. Кто-нибудь может помочь выяснить, в чем проблема с этим UDF?