Проблема, с которой вы сталкиваетесь, четко указана в сообщении об исключении - поскольку столбцы MapType
не могут быть ни хешируемыми, ни упорядочиваемыми, их нельзя использовать как часть выражения группировки или разделения.
Ваше решение SQL не является логически эквивалентным distinct
на Dataset
. Если вы хотите дедуплицировать данные на основе набора совместимых столбцов, вы должны использовать dropDuplicates
:
df.dropDuplicates("timestamp")
что будет эквивалентно
SELECT timestamp, first(c1) AS c1, first(c2) AS c2, ..., first(cn) AS cn,
first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp
К сожалению, если ваша цель реальна DISTINCT
это будет не так просто. Возможное решение - использовать хэши Scala * Map
. Вы можете определить Scala udf
следующим образом:
spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)
, а затем используйте его в своем коде Java для получения столбца, который можно использовать для dropDuplicates
:
df
.selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
.dropDuplicates(
// All columns excluding canvasHashes / hash_of_canvas_hashes
"timestamp", "c1", "c2", ..., "cn"
// Hash used as surrogate of canvasHashes
"hash_of_canvas_hashes"
)
с эквивалентом SQL
SELECT
timestamp, c1, c2, ..., cn, -- All columns excluding canvasHashes
first(canvasHashes) AS canvasHashes
FROM df GROUP BY
timestamp, c1, c2, ..., cn -- All columns excluding canvasHashes
* Обратите внимание, что java.util.Map
с hashCode
не будет работать, так как hashCode
не соответствует.