Spark Geo плитки присоединяются - PullRequest
0 голосов
/ 28 июня 2018

У меня проблема с выполнением моей искровой задачи.

У меня есть две таблицы:

  • геосетка с размерами ячеек 200х200 метров. Размер около 2 миллионов строк. Схема:

    cell_id minlat minlon maxlat maxlon

  • геообъекты. Размер около 200 тысяч строк. Схема:

    objid lat lon

Я хочу присоединиться к этим таблицам и найти ячейку для каждого объекта. Желаемая схема:

objid lat lon cell_id

Первое наивное решение:

  cellDF.join(objDF, callUDF("isContain", col("minlat"),..col("lat"), col("lon")));

где UDF просто проверить minlat <= lat <= maxlat && minlon <= lon <= maxlon

Но эти решения работают очень медленно. Несколько часов в кластере с 20+ узлами.

Второе, что я попробовал - используя esri-geometry-api . Я создал Polygon для каждой ячейки и Point для каждого объекта и проверил polygon.contains(point). Но это решение работает медленнее, чем первое.

Может быть, есть "лучшие практики" для таких соединений в искре? Я нашел некоторую информацию о QuadTree, но не нашел четкой документации и примеров для этого алгоритма в spark.

P.S. Версия Spark 2.2.0.

1 Ответ

0 голосов
/ 28 июня 2018

предположим, что у вас есть два CSV-файла (если это не так, вам нужно только изменить ввод)

// Create a spark session
SparkSession session = SparkSession.builder().appName("name here").getOrCreate();

// Create datasets for both input
Dataset<Fishnet> fishnet = session.read().format("csv").option("header", true).option("inferSchema", true).load("fishnet.csv").as(Encoders.bean(Fishnet.class));
Dataset<GeoObject> geoObject = session.read().format("csv").option("header", true).option("inferSchema", true).load("geoObject.csv").as(Encoders.bean(GeoObject.class));

// Create temp view on datasets
fishnet.createOrReplaceTempView("fishnet");
geoObject.createOrReplaceTempView("geoObject");

// Now create a query to retrieve the result [objid lat lon cell_id]
Dataset<Row> result = session.sql("select objid, lat, lon, cell_id from fishnet, geoObject where lat >= minlat and lat <= maxlat and lon >= minlon and lon <= maxlon");
...