spark - mimi c pyspark asDict () для scala без использования класса case - PullRequest
0 голосов
/ 12 июля 2020

Pyspark позволяет создавать словарь, когда из фрейма данных возвращается одна единственная строка, используя следующий подход.

t=spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)")).collect()[0].asDict()
print(t)
print(t["key"])
print(t["value"])
print(t["rw"])
print("Printing using for comprehension")
[print(t[i]) for i in t ]

Results:

{'key': 'spark.app.id', 'value': 'local-1594577194330', 'rw': 1}
spark.app.id
local-1594577194330
1
Printing using for comprehension
spark.app.id
local-1594577194330
1

Я пытаюсь сделать то же самое в scala -spark. Это возможно с использованием подхода класса case.

case class download(key:String, value:String,rw:Long)

val t=spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)")).as[download].first
println(t)
println(t.key)
println(t.value)
println(t.rw)

Результаты:

download(spark.app.id,local-1594580739413,1)
spark.app.id
local-1594580739413
1

В реальной проблеме у меня почти 200+ столбцов, и я не хочу использовать подход класса case. Я пытаюсь сделать что-то вроде ниже, чтобы избежать варианта класса case.

val df =spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)"))

(df.columns).zip(df.take(1)(0))

, но получаю ошибку.

    <console>:28: error: type mismatch;
 found   : (String, String, Long)
 required: Iterator[?]
       (df.columns.toIterator).zip(df.take(1)(0))

Есть ли способ решить эту проблему.

Ответы [ 2 ]

1 голос
/ 13 июля 2020

В scala есть метод getValuesMap для преобразования row в Map[columnName: String, columnValue: T]. Попробуйте использовать то же, что и ниже -

  val df =spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)"))
    df.show(false)
    df.printSchema()

    /**
      * +----------------------------+-------------------+---+
      * |key                         |value              |rw |
      * +----------------------------+-------------------+---+
      * |spark.app.id                |local-1594644271573|1  |
      * |spark.app.name              |TestSuite          |2  |
      * |spark.driver.host           |192.168.1.3        |3  |
      * |spark.driver.port           |58420              |4  |
      * |spark.executor.id           |driver             |5  |
      * |spark.master                |local[2]           |6  |
      * |spark.sql.shuffle.partitions|2                  |7  |
      * +----------------------------+-------------------+---+
      *
      * root
      * |-- key: string (nullable = false)
      * |-- value: string (nullable = false)
      * |-- rw: integer (nullable = true)
      */

    val map = df.head().getValuesMap(df.columns)
    println(map)
    println(map("key"))
    println(map("value"))
    println(map("rw"))
    println("Printing using for comprehension")
    map.foreach(println)

    /**
      * Map(key -> spark.app.id, value -> local-1594644271573, rw -> 1)
      * spark.app.id
      * local-1594644271573
      * 1
      * Printing using for comprehension
      * (key,spark.app.id)
      * (value,local-1594644271573)
      * (rw,1)
      */
1 голос
/ 13 июля 2020

Проблема в том, что zip - это метод коллекции, который может принимать только другой объект коллекции, который реализует IterableOnce , а df.take(1)(0) - это Spark SQL Row, что не t попадают в эту категорию.

Попробуйте преобразовать строку в Seq, используя метод toSeq.

df.columns.zip(df.take(1)(0).toSeq)

результат:

Array((key,spark.app.id), (value,local-1594577194330), (rw,1))
...