SQL Запрос и фрейм данных с использованием Spark / Java - PullRequest
0 голосов
/ 12 июля 2020

Я новичок в искре, и я застрял в том, как сделать запрос sql с использованием фрейма данных.

У меня есть два следующих фрейма данных.

df_zones
+-----------------+-----------------+----------------------+---------------------+
|id               |geomType         |geom                  |rayon                |
+-----------------+-----------------+----------------------+---------------------+
|30               |Polygon          |[00 00 00 00 01 0...] |200                  |
|32               |Point            |[00 00 00 00 01 0.. ] |320179               |
+-----------------+-----------------+----------------------+---------------------+
df_tracking
+-----------------+-----------------+----------------------+
|idZones         |Longitude        |Latitude              |               
+-----------------+-----------------+----------------------+
|[30,50,100,]     | -7.6198783      |33.5942549            |
|[20,140,39,]     |-7.6198783       |33.5942549            |
+-----------------+-----------------+----------------------+

Я хочу выполните следующий запрос.

"SELECT zones.* FROM zones WHERE zones.id IN ("
                            + idZones
                            + ") AND ((zones.geomType='Polygon' AND (ST_WITHIN(ST_GeomFromText(CONCAT('POINT(',"
                            + longitude
                            + ",' ',"
                            + latitude
                            + ",')'),4326),zones.geom))) OR (   (zones.geomType='LineString' OR zones.geomType='Point') AND  ST_Intersects(ST_buffer(zones.geom,(zones.rayon/100000)),ST_GeomFromText(CONCAT('POINT(',"
                            + longitude
                            + ",' ',"
                            + latitude
                            + ",')'),4326)))) "

Я действительно застрял, мне следует объединить два фрейма данных или что? Я попытался соединить два фрейма данных с id и idZone следующим образом:

     df_tracking.select(explode(col("idZones").as ("idZones"))).join(df_zones,col("idZones").equalTo(df_zones.col("id")));

, но мне кажется, что объединение - неправильный выбор.

Мне нужна ваша помощь.

Спасибо

1 Ответ

2 голосов
/ 12 июля 2020

Вы можете преобразовать df_tracking.idZones eg: [20, 140, 39] в тип Array() и использовать array_contains(), что упростит работу при объединении по диапазону элементов.

val joinDF = df_zones.join(df_tracking, array_contains($"id_Zones",$"id"))

Пример кода:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object JoinExample extends App{

val spark = SparkSession.builder()
    .master("local[8]")
    .appName("Example")
    .getOrCreate()


  import spark.implicits._

val df_zones = Seq(
      (30,"Polygon", "[00 00 00 00 01]",200),
      (32,"Point", "[00 00 00 00 01]",320179),
      (39,"Point", "[00 00 00 00 01]",320179)
      ).toDF("id","geomType","geom","rayon")

val df_tracking = Seq(
      (Array(30,50,100),"-7.6198783","33.5942549"),
      (Array(20,140,39),"-7.6198783","33.5942549"))
  .toDF("id_Zones","Longitude","Latitude")

  df_zones.show()
  df_tracking.show()


  val joinDF = df_zones.join(df_tracking, array_contains($"id_Zones",$"id"))
  joinDF.show()

Вывод:

+---+--------+----------------+------+
| id|geomType|            geom| rayon|
+---+--------+----------------+------+
| 30| Polygon|[00 00 00 00 01]|   200|
| 32|   Point|[00 00 00 00 01]|320179|
| 39|   Point|[00 00 00 00 01]|320179|
+---+--------+----------------+------+

+-------------+----------+----------+
|     id_Zones| Longitude|  Latitude|
+-------------+----------+----------+
|[30, 50, 100]|-7.6198783|33.5942549|
|[20, 140, 39]|-7.6198783|33.5942549|
+-------------+----------+----------+

+---+--------+----------------+------+-------------+----------+----------+
| id|geomType|            geom| rayon|     id_Zones| Longitude|  Latitude|
+---+--------+----------------+------+-------------+----------+----------+
| 30| Polygon|[00 00 00 00 01]|   200|[30, 50, 100]|-7.6198783|33.5942549|
| 39|   Point|[00 00 00 00 01]|320179|[20, 140, 39]|-7.6198783|33.5942549|
+---+--------+----------------+------+-------------+----------+----------+

Edit-1: В продолжение вышеизложенного запрос можно лучше всего преобразовать, определив SPARK UDF's ниже фрагмент кода, который дает вам краткое представление.

  // UDF Creation

  // Define Logic of (ST_WITHIN(ST_GeomFromText(CONCAT('POINT(', longitude, ' ', latitude, ')')
  // , 4326), zones.geom))
  val condition1 = (x:Int) => {1}

  // Define Logic of ST_Intersects(ST_buffer(zones.geom, (zones.rayon / 100000)),
  // ST_GeomFromText(CONCAT('POINT(', longitude, ' ', latitude, ')'), 4326))
  val condition2 = (y:Int) => {1}

  val condition1UDF = udf(condition1)
  val condition2UDF = udf(condition2)


  val joinDF = df_zones.join(df_tracking, array_contains($"id_Zones",$"id"))

  val finalDF = joinDF
      .withColumn("Condition1DerivedValue", condition1UDF(lit("000")))
      .withColumn("Condition2DerivedValue", condition2UDF(lit("000")))
      .filter(
        (col("geomType") === "Polygon" and col("Condition1DerivedValue") === 1 )
      or ((col("geomType")==="LineString" or col("geomType")==="Point")
          and $"Condition2DerivedValue" === 1
        )
      )
    .select("id","geomType","geom","rayon")

  finalDF.show()

Вывод:

+---+--------+----------------+------+
| id|geomType|            geom| rayon|
+---+--------+----------------+------+
| 30| Polygon|[00 00 00 00 01]|   200|
| 39|   Point|[00 00 00 00 01]|320179|
+---+--------+----------------+------+
...