Я получаю UnsupportedOperationException при использовании ST_Within в геопарке - PullRequest
0 голосов
/ 29 апреля 2019

Я использовал функцию ST_Within в geoSpark.Но я получаю java.lang.UnsupportedOperationException: Нераспознанный ID типа схемы сжатия: 656. Что вызывает эту проблему?

Я пробовал использовать другие функции.Вроде ST_Intersects работает нормально.Найдите stackTrace:

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1455)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1443)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1442)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1625)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1614)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1935)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1948)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1961)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
  ... 48 elided
Caused by: java.lang.UnsupportedOperationException: Unrecognized compression scheme type ID: 656
  at org.apache.spark.sql.execution.columnar.compression.CompressionScheme$$anonfun$apply$1.apply(CompressionScheme.scala:71)
  at org.apache.spark.sql.execution.columnar.compression.CompressionScheme$$anonfun$apply$1.apply(CompressionScheme.scala:71)
  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
  at scala.collection.AbstractMap.getOrElse(Map.scala:59)
  at org.apache.spark.sql.execution.columnar.compression.CompressionScheme$.apply(CompressionScheme.scala:71)
  at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor$class.initialize(CompressibleColumnAccessor.scala:31)
  at org.apache.spark.sql.execution.columnar.NativeColumnAccessor.initialize(ColumnAccessor.scala:71)
  at org.apache.spark.sql.execution.columnar.ColumnAccessor$class.$init$(ColumnAccessor.scala:36)
  at org.apache.spark.sql.execution.columnar.BasicColumnAccessor.<init>(ColumnAccessor.scala:50)
  at org.apache.spark.sql.execution.columnar.NativeColumnAccessor.<init>(ColumnAccessor.scala:74)
  at org.apache.spark.sql.execution.columnar.StringColumnAccessor.<init>(ColumnAccessor.scala:100)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:829)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:829)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:100)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

// Код начинается здесь

import com.vividsolutions.jts.geom.Geometry
import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator

import org.datasyslab.geospark.spatialRDD.SpatialRDD
import org.datasyslab.geosparksql.utils.Adapter
GeoSparkSQLRegistrator.registerAll(spark)

// Чтение кадра данных

val df = spark.read.parquet("/rawdata").filter(upper($"Market")===lit("XXXX")).filter(upper($"operator_name")===lit("XXXXX"))

val spDf = spark.sql("select download_kbps,ST_Transform(ST_Buffer(ST_Transform(ST_Point(CAST(client_longitude AS Decimal(24,20)),CAST(client_latitude AS Decimal(24,20))),'epsg:4326','epsg:32610'),200),'epsg:32610','epsg:4326') as checkin from geo")
spDf.registerTempTable("geo1")

val cs = spark.read.parquet("/cspath")

val cs_1 = cs.filter($"lte_rsrp">(-141) && $"lte_rsrp" <(-40)).filter($"lte_rsrq">(-24) && $"lte_rsrq" <(-1)).
filter(upper($"network_name")===lit(operator) && upper($"tpim_market")===lit(market)).//USe OS FILTER AS WELL
select($"lte_rsrq".cast("double").as("rsrq"),$"lte_rsrp".cast("double").as("rsrp"),$"hour".cast("int"),$"hex56_center_lon".cast("double").as("client_longitude"),
$"hex56_center_lat".cast("double").as("client_latitude"),$"model",$"manufacturer")

val cs_2 = cs_1.withColumn("client_latitude_new",round(($"client_latitude"/100)*100,3)).//.drop("client_latitude").withColumnRenamed("latitude","client_latitude").
            withColumn("client_longitude_new",round(($"client_longitude"/100)*100,3)).//.drop("client_longitude").withColumnRenamed("longitude","client_longitude").
            withColumn("geoHash",hash($"client_latitude",$"client_longitude",lit(5))).//.drop("client_longitude").withColumnRenamed("longitude","client_longitude").
            withColumn("rssi",$"rsrp"+12).
            withColumn("prb",round((($"rssi"*$"rsrq")/$"rsrp"),3)).
            withColumn("xx",round((lit(6371)*cos($"client_latitude")*cos($"client_longitude")),6)).
            withColumn("yy",round((lit(6371)*cos($"client_latitude")*sin($"client_longitude")),6)).
            withColumn("zz",round((lit(6371)*sin($"client_latitude")),6)).
            withColumn("radial_r",round((pow($"client_longitude",lit(3))+pow($"client_latitude",lit(3))),6)).
            withColumn("rot45_X",round(((lit(.707)*$"client_longitude")-(lit(.707)*$"client_latitude")),6)).
            withColumn("rot30_X",round(((lit(0.866)*$"client_longitude")-(lit(0.5)*$"client_latitude")),6)).
            withColumn("rot60_X",round(((lit(0.5)*$"client_longitude")-(lit(0.866)*$"client_latitude")),6)).
            withColumn("newlatlong",concat(concat($"client_longitude_new",lit(",")),$"client_latitude_new")).na.drop()
cs_2.registerTempTable("crowdsource")

val cs_3 = spark.sql("select ST_Point(CAST(client_longitude AS Decimal(24,20)),CAST(client_latitude AS Decimal(24,20))) as check from crowdsource")

cs_3.resgisterTempTable("crowdsource1")

// Здесь выполняется пространственное соединение, котороевызывает проблему

val joinDf = spark.sql("select * from crowdsource1 a join geo1 b where ST_Within(a.check,b.checkin)")

joinDf.show

Когда я выполняю действие подсчета на этом кадре данных, вычисляется результат.Но когда я показываю его или записываю в какую-либо файловую систему, он показывает ошибку.

...