Spark Dataframe: выбор отдельных строк - PullRequest
1 голос
/ 05 марта 2019

Я пробовал два способа найти отдельные строки из паркета, но, похоже, это не сработало. Попытка 1: Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();Но выдает

Cannot have map type columns in DataFrame which calls set operations
(intersect, except, etc.), 
but the type of column canvasHashes is map<string,string>;;

Попытка 2: Попытка выполнения запросов sql:

Dataset<Row> df = sqlContext.read().parquet("location.parquet");
    rawLandingDS.createOrReplaceTempView("df");
    Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");

Ошибка, которую я получаю:

= SQL ==
SELECT distinct on timestamp * from df
-----------------------------^^^

Есть лиспособ получить отчетливые записи при чтении файлов паркета?Любой вариант чтения, который я могу использовать.

Ответы [ 3 ]

3 голосов
/ 06 марта 2019

Проблема, с которой вы сталкиваетесь, четко указана в сообщении об исключении - поскольку столбцы MapType не могут быть ни хешируемыми, ни упорядочиваемыми, их нельзя использовать как часть выражения группировки или разделения.

Ваше решение SQL не является логически эквивалентным distinct на Dataset. Если вы хотите дедуплицировать данные на основе набора совместимых столбцов, вы должны использовать dropDuplicates:

df.dropDuplicates("timestamp")

что будет эквивалентно

SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
       first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp

К сожалению, если ваша цель реальна DISTINCT это будет не так просто. Возможное решение - использовать хэши Scala * Map. Вы можете определить Scala udf следующим образом:

spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)

, а затем используйте его в своем коде Java для получения столбца, который можно использовать для dropDuplicates:

 df
  .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
  .dropDuplicates(
    // All columns excluding canvasHashes / hash_of_canvas_hashes
    "timestamp",  "c1", "c2", ..., "cn" 
    // Hash used as surrogate of canvasHashes
    "hash_of_canvas_hashes"         
  )

с эквивалентом SQL

SELECT 
  timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
  first(canvasHashes) AS canvasHashes
FROM df GROUP BY
  timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes

* Обратите внимание, что java.util.Map с hashCode не будет работать, так как hashCode не соответствует.

2 голосов
/ 05 марта 2019

Да, синтаксис неверный, он должен быть:

Dataset<Row> landingDF = sqlContext.sql("SELECT distinct * from df");
0 голосов
/ 06 марта 2019

1) Если вы хотите различать по колунам, вы можете использовать его

val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("no", "age")


scala> df.show
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+

val distinctValuesDF = df.select(df("no")).distinct

scala> distinctValuesDF.show
+---+
| no|
+---+
|  1|
|  3|
+---+

2) Если вы хотите, чтобы уникальный для всех столбцов использовался dropduplicate

scala> val df = sc.parallelize(Array((1, 2), (3, 4),(3, 4), (1, 6))).toDF("no", "age")



scala> df.show

+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  3|  4|
|  1|  6|
+---+---+


scala> df.dropDuplicates().show()
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+
...