Spark DataFrame to Dict - ошибка элемента последовательности обновления словаря - PullRequest
0 голосов
/ 09 октября 2018

Я пытаюсь преобразовать DataFrame как Dict, используя функцию collectAsMap() в RDD.

Код:

dict = df.rdd.collectAsMap()

Журнал ошибок:

ValueError: dictionary update sequence element #0 has length 8; 2 is required

Обновление:

DF имеет 8 полей, означает ли это, что collectAsMap() может использоваться только DF с двумяполя

Ответы [ 2 ]

0 голосов
/ 11 декабря 2018

ниже - объяснение того же действия в pyspark.Я согласен с объяснением Рама.collectAsMap применимо только к pairedrdd, поэтому вам нужно сначала преобразовать ваш фрейм данных в пару rdd, а затем преобразовать его в некоторый словарь с помощью функции collectAsMap.

Например, у меня есть следующий фрейм данных:

df = spark.sql("""select emp_id,emp_city from udb.temptable_1 order by emp_id""");
+------+--------+
|emp_id|emp_city|
+------+--------+
|     1|NOIDA   |
|     2|GURGAON |
|     3|DWARKA  |
|     4|SAKET   |
|     5|USA     |
|     6|UP      |
|     7|NOIDA   |
|     8|SAKET   |
|     9|GURGAON |
+------+--------+

преобразование его в пару ключ-значение rdd

newrdd = df.rdd.map(lambda x : (x[0],x))

>>> type(newrdd)
<class 'pyspark.rdd.PipelinedRDD'>

[(1, Row(emp_id=1, emp_city=u'NOIDA   ')), 
(2, Row(emp_id=2, emp_city=u'GURGAON ')), 
(3, Row(emp_id=3, emp_city=u'DWARKA  ')), 
(4, Row(emp_id=4, emp_city=u'SAKET   ')), 
(5, Row(emp_id=5, emp_city=u'USA     ')), 
(6, Row(emp_id=6, emp_city=u'UP      ')), 
(7, Row(emp_id=7, emp_city=u'NOIDA   ')), 
(8, Row(emp_id=8, emp_city=u'SAKET   ')), 
(9, Row(emp_id=9, emp_city=u'GURGAON '))]

наконец, вы можете использовать collectAsMap для преобразования вашей пары ключ-значение rdd в dict

dict = newrdd.collectAsMap()

{1: Row(emp_id=1, emp_city=u'NOIDA   '), 
2: Row(emp_id=2, emp_city=u'GURGAON '), 
3: Row(emp_id=3, emp_city=u'DWARKA  '), 
4: Row(emp_id=4, emp_city=u'SAKET   '), 
5: Row(emp_id=5, emp_city=u'USA     '), 
6: Row(emp_id=6, emp_city=u'UP      '), 
7: Row(emp_id=7, emp_city=u'NOIDA   '), 
8: Row(emp_id=8, emp_city=u'SAKET   '), 
9: Row(emp_id=9, emp_city=u'GURGAON ')}

>>> dict.keys()
[1, 2, 3, 4, 5, 6, 7, 8, 9]

>>> dict.get(2)
Row(emp_id=2, emp_city=u'GURGAON ')
0 голосов
/ 02 декабря 2018

Во-первых, я плох в python / pyspark, поэтому я использовал демо-версию, используя scala ...

collectAsMap применимо только для pairedRDD (см. Ниже код из spark doc / код базы)

/**
       * Return the key-value pairs in this RDD to the master as a Map.
       *
       * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
       *          one value per key is preserved in the map returned)
       *
       * @note this method should only be used if the resulting data is expected to be small, as
       * all the data is loaded into the driver's memory.
       */
      def collectAsMap(): Map[K, V] = self.withScope {
        val data = self.collect()
        val map = new mutable.HashMap[K, V]
        map.sizeHint(data.length)
        data.foreach { pair => map.put(pair._1, pair._2) }
        map
      }

ваш df.rdd эквивалентен RDD[Row], поскольку вы конвертируете DataFrame в RDD.

, чтобы вы могли * НЕ* собрать его как карту.если вам не нужно делать keyBy ( Создает кортежи элементов в этом СДР, применяя f) к любому элементу в строке.или какая-либо другая операция, которая преобразует это в парный СДР.

Ниже приведен полный пример, демонстрирующий это.


import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

/** *
  * collectAsMap is only applicable to pairedrdd if you want to do a map then you can do a rdd key by and proceed
  *
  * @author : Ram Ghadiyaram
  */
object PairedRDDPlay extends Logging {
  Logger.getLogger("org").setLevel(Level.OFF)
  // Logger.getLogger("akka").setLevel(Level.OFF)

  def main(args: Array[String]): Unit = {
    val appName = if (args.length > 0) args(0) else this.getClass.getName
    val spark: SparkSession = SparkSession.builder
      .config("spark.master", "local") //.config("spark.eventLog.enabled", "true")
      .appName(appName)
      .getOrCreate()
    import spark.implicits._
    val pairs = spark.sparkContext.parallelize(Array((1, 1,3), (1, 2,3), (1, 3,3), (1, 1,3), (2, 1,3))).toDF("mycol1", "mycol2","mycol3")
    pairs.show()

    val keyedBy = pairs.rdd.keyBy(_.getAs[Int]("mycol1"))
    keyedBy.foreach(x => println("using keyBy-->>" + x))
    val myMap = keyedBy.collectAsMap()
    println(myMap.toString())
    assert(myMap.size == 2)
    //    val myMap1 = pairs.rdd.collectAsMap()
    //    println(myMap1.toString())
    //    assert(myMap1.size == 2)
    //Error:(28, 28) value collectAsMap is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
    //    val myMap1 = pairs.rdd.collectAsMap()
  }
}

Результат:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+------+------+------+
|mycol1|mycol2|mycol3|
+------+------+------+
|     1|     1|     3|
|     1|     2|     3|
|     1|     3|     3|
|     1|     1|     3|
|     2|     1|     3|
+------+------+------+

using keyBy-->>(1,[1,1,3])
using keyBy-->>(1,[1,2,3])
using keyBy-->>(1,[1,3,3])
using keyBy-->>(1,[1,1,3])
using keyBy-->>(2,[2,1,3])
Map(2 -> [2,1,3], 1 -> [1,1,3])

Вопрос: DF имеет 8 полей, означает ли это, что collectAsMap () можно использовать только DF с двумя полями?


Ответ: НЕТ, вы можете увидеть пример с несколькими столбцами (т.е.> 2) в примере.но вам нужно преобразовать его в pairrdd.

Также взгляните на Как работает сборник-карта-работа-работа-для-искры-API

...