Во-первых, я плох в python / pyspark, поэтому я использовал демо-версию, используя scala ...
collectAsMap
применимо только для pairedRDD (см. Ниже код из spark doc / код базы)
/**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*
* @note this method should only be used if the resulting data is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collectAsMap(): Map[K, V] = self.withScope {
val data = self.collect()
val map = new mutable.HashMap[K, V]
map.sizeHint(data.length)
data.foreach { pair => map.put(pair._1, pair._2) }
map
}
ваш df.rdd
эквивалентен RDD[Row]
, поскольку вы конвертируете DataFrame
в RDD
.
, чтобы вы могли * НЕ* собрать его как карту.если вам не нужно делать keyBy
( Создает кортежи элементов в этом СДР, применяя f
) к любому элементу в строке.или какая-либо другая операция, которая преобразует это в парный СДР.
Ниже приведен полный пример, демонстрирующий это.
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
/** *
* collectAsMap is only applicable to pairedrdd if you want to do a map then you can do a rdd key by and proceed
*
* @author : Ram Ghadiyaram
*/
object PairedRDDPlay extends Logging {
Logger.getLogger("org").setLevel(Level.OFF)
// Logger.getLogger("akka").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
val appName = if (args.length > 0) args(0) else this.getClass.getName
val spark: SparkSession = SparkSession.builder
.config("spark.master", "local") //.config("spark.eventLog.enabled", "true")
.appName(appName)
.getOrCreate()
import spark.implicits._
val pairs = spark.sparkContext.parallelize(Array((1, 1,3), (1, 2,3), (1, 3,3), (1, 1,3), (2, 1,3))).toDF("mycol1", "mycol2","mycol3")
pairs.show()
val keyedBy = pairs.rdd.keyBy(_.getAs[Int]("mycol1"))
keyedBy.foreach(x => println("using keyBy-->>" + x))
val myMap = keyedBy.collectAsMap()
println(myMap.toString())
assert(myMap.size == 2)
// val myMap1 = pairs.rdd.collectAsMap()
// println(myMap1.toString())
// assert(myMap1.size == 2)
//Error:(28, 28) value collectAsMap is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
// val myMap1 = pairs.rdd.collectAsMap()
}
}
Результат:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+------+------+------+
|mycol1|mycol2|mycol3|
+------+------+------+
| 1| 1| 3|
| 1| 2| 3|
| 1| 3| 3|
| 1| 1| 3|
| 2| 1| 3|
+------+------+------+
using keyBy-->>(1,[1,1,3])
using keyBy-->>(1,[1,2,3])
using keyBy-->>(1,[1,3,3])
using keyBy-->>(1,[1,1,3])
using keyBy-->>(2,[2,1,3])
Map(2 -> [2,1,3], 1 -> [1,1,3])
Вопрос: DF имеет 8 полей, означает ли это, что collectAsMap
() можно использовать только DF с двумя полями?
Ответ: НЕТ, вы можете увидеть пример с несколькими столбцами (т.е.> 2) в примере.но вам нужно преобразовать его в pairrdd.
Также взгляните на Как работает сборник-карта-работа-работа-для-искры-API