scala - распространять rdd используя список карт - PullRequest
0 голосов
/ 10 октября 2018

Я хочу распространить rdd, используя карту списка.

входная выборка

Log("key1", "key2", "key3", Map(tk1 -> tv1, tk2 -> tv2, tk3 -> tv3))

И выходная выборка, которую я хочу:

RDD[(String, String, String, String, String)]
("key1", "key2", "key3", "tk1", "tv1")
("key1", "key2", "key3", "tk2", "tv2")
("key1", "key2", "key3", "tk3", "tv3")

Наконец, я хочувыполнить операцию уменьшения, как показано ниже.Но это не работает.

val mapCnt = logs.map(log => {
  log.textMap.foreach { tmap =>
    var tkey = tmap._1
    var tvalue = tmap._2
  }
  ((log.key1, log.key2, log.key3, tkey, tvalue), 1L)
}).reduceByKey(_ + _)

Вот входной объект, который я использовал.

case class Log(
            val key1: String,
            val key2: String,
            val key3: String,
            val TextMap: Map[String, String]
          ) 

Как мне преобразовать это?

Спасибо за вашу помощь.

Ответы [ 2 ]

0 голосов
/ 10 октября 2018

Я не уверен во второй части, но ниже DF решение для первой части.

scala> case class Log(
     |             val key1: String,
     |             val key2: String,
     |             val key3: String,
     |             val TextMap: Map[String, String]
     |           )
defined class Log

scala> val df = Seq(Log("key1", "key2", "key3", Map("tk1" -> "tv1", "tk2" -> "tv2", "tk3" -> "tv3"))).toDF().as[Log]
df: org.apache.spark.sql.Dataset[Log] = [key1: string, key2: string ... 2 more fields]

scala> val df2 = df.withColumn("mapk",map_keys('TextMap))
df2: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]

scala> val df3 = df2.select('key1,'key2,'key3,'TextMap,'mapk, explode('mapk).as("exp1")).withColumn("exp2",('Textmap)('exp1)).drop("TextMap","mapk")
df3: org.apache.spark.sql.DataFrame = [key1: string, key2: string ... 3 more fields]

scala> df3.show
+----+----+----+----+----+
|key1|key2|key3|exp1|exp2|
+----+----+----+----+----+
|key1|key2|key3| tk1| tv1|
|key1|key2|key3| tk2| tv2|
|key1|key2|key3| tk3| tv3|
+----+----+----+----+----+


scala> df3.printSchema
root
 |-- key1: string (nullable = true)
 |-- key2: string (nullable = true)
 |-- key3: string (nullable = true)
 |-- exp1: string (nullable = true)
 |-- exp2: string (nullable = true)


scala>
0 голосов
/ 10 октября 2018

Вы вычисляете результаты в foreach и сразу отбрасываете их.Кроме того, значения выходят за рамки.Лучше всего использовать flatMap здесь.

val mapCnt = logs.flatMap(log => {
  for { 
    (tkey, tvalue) <- tmap
  } yield ((log.key1, log.key2, log.key3, tkey, tvalue), 1L)
}).reduceByKey(_ + _)
...