С Spark / Scala есть ли способ объединиться со сложными структурами данных? - PullRequest
0 голосов
/ 16 апреля 2020

У меня есть фрейм данных со сложной структурой. Внизу внутри этой структуры мне нужно заменить значение другим на основе сопоставления из другого фрейма данных. В настоящее время мы достигли sh этого, взорвав фрейм данных, присоединившись, а затем выполнив группирование с агрегацией. Проблема в том, что мы превращаем записи 3.5B в записи 210B. Стоимость группировки очень высока. И я начинаю с данных, которые уже сгруппированы, как я хочу. Был ли какой-то способ выполнить это, не взорвавшись и не сгруппировавшись?

Вот пример кода из блокнота Zeppelin для иллюстрации нашего текущего метода:

import spark.implicits._

case class A(device_id: Long, cluster: Seq[B])
case class B(location_id: Long, score: Double)
case class C(location_id: Long, location_key: String)
case class D(location_key: String, score: Double)

val df1 = Seq(
  A(1L, Seq(B(1L, 1.1), B(2L, 2.2), B(3L, 3.3))),
  A(2L, Seq(B(4L, 4.4), B(5L, 5.5), B(6L, 6.6))),
  A(3L, Seq(B(7L, 7.7), B(8L, 8.8), B(9L, 9.9)))
).toDF

val df2 = Seq(
  C(1L, "a"),
  C(2L, "b"),
  C(3L, "c"),
  C(4L, "d"),
  C(5L, "e"),
  C(6L, "f"),
  C(7L, "g"),
  C(8L, "h"),
  C(9L, "i")
).toDF

val df3 = df1
  .select($"device_id", explode($"cluster").as("record"))
  .select($"device_id", $"record.location_id".as("location_id"), $"record.score".as("score"))

val df4 = df3
  .join(df2, "location_id")
  .select($"device_id", $"location_key", $"score")

val df5 = df4
  .groupBy($"device_id")
  .agg(
    collect_list(struct($"location_key", $"score")).as("cluster")
  )

df1.printSchema()
df1.show(3, false)

df5.printSchema()
df5.show(3, false)

Вывод выглядит так:

root
 |-- device_id: long (nullable = false)
 |-- cluster: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- location_id: long (nullable = false)
 |    |    |-- score: double (nullable = false)

+---------+------------------------------+
|device_id|cluster                       |
+---------+------------------------------+
|1        |[[1, 1.1], [2, 2.2], [3, 3.3]]|
|2        |[[4, 4.4], [5, 5.5], [6, 6.6]]|
|3        |[[7, 7.7], [8, 8.8], [9, 9.9]]|
+---------+------------------------------+

root
 |-- device_id: long (nullable = false)
 |-- cluster: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- location_key: string (nullable = true)
 |    |    |-- score: double (nullable = true)

+---------+------------------------------+
|device_id|cluster                       |
+---------+------------------------------+
|1        |[[a, 1.1], [c, 3.3], [b, 2.2]]|
|2        |[[e, 5.5], [d, 4.4], [f, 6.6]]|
|3        |[[g, 7.7], [h, 8.8], [i, 9.9]]|
+---------+------------------------------+

Ответы [ 2 ]

0 голосов
/ 23 апреля 2020

Похоже, что нет решения. По крайней мере, не тот, который работал для меня. Я попытался преобразовать df в набор данных и сопоставить его. Внутри карты я пытался создать фрейм данных, выполнить объединение, собрать и вернуть новый класс дел. Это не работает Единственное, что сработало, это преобразование df2 в Scala карту, а затем создание карты по набору данных, где я мог найти значение замены на карте. Проблема в том, что моя настоящая жизнь df2 слишком велика, чтобы уместиться в память драйвера.

0 голосов
/ 18 апреля 2020

Я не уверен, насколько это поможет вам в проблеме производительности, не говоря уже о том, чтобы ответить на ваш вопрос. Но шаг разнесения можно изменить, чтобы использовать flatMap, который может немного увеличить вашу производительность (хотя я думаю, что explode и flatMap должны быть довольно похожими, поэтому разница в производительности также может отсутствовать):

import org.apache.spark.sql.SparkSession                                                                                                  
import org.apache.spark.sql.functions.{ collect_list, struct}                                                                             

case class A(device_id: Long, cluster: Seq[B])                                                                                            
case class B(location_id: Long, score: Double)                                                                                            
case class C(location_id: Long, location_key: String)                                                                                     
case class D(location_key: String, score: Double)                                                                                         
case class E(device_id: Long, location_id: Long, score: Double)                                                                           

object ComplexDataStructures {                                                                                                            
  def main(args: Array[String]): Unit = {                                                                                                 

    val spark = SparkSession                                                                                                              
      .builder()                                                                                                                          
      .appName("Spark SQL basic example")                                                                                                 
      .config("spark.master", "local")                                                                                                    
      .getOrCreate()                                                                                                                      

    import spark.implicits._                                                                                                              

    val df1 = Seq(                                                                                                                        
      A(1L, Seq(B(1L, 1.1), B(2L, 2.2), B(3L, 3.3))),                                                                                     
      A(2L, Seq(B(4L, 4.4), B(5L, 5.5), B(6L, 6.6))),                                                                                     
      A(3L, Seq(B(7L, 7.7), B(8L, 8.8), B(9L, 9.9)))                                                                                      
    ).toDS.as[A]                                                                                                                          

    val df2 = Seq(                                                                                                                        
      C(1L, "a"),                                                                                                                         
      C(2L, "b"),                                                                                                                         
      C(3L, "c"),                                                                                                                         
      C(4L, "d"),                                                                                                                         
      C(5L, "e"),                                                                                                                         
      C(6L, "f"),                                                                                                                         
      C(7L, "g"),                                                                                                                         
      C(8L, "h"),                                                                                                                         
      C(9L, "i")                                                                                                                          
    ).toDS.as[C]                                                                                                                          


    val df3 = df1.flatMap{ case A(a,b) => b.map((a,_)).map( x => E(x._1, x._2.location_id, x._2.score)) }                                 

    val df4 = df3.join(df2, df3("location_id") === df2("location_id")).select(df3("device_id"), df2("location_key"), df3("score"))        

    val df5 = df4.groupBy("device_id").agg(collect_list(struct("location_key","score")).as("cluster"))                                    

    df5.printSchema()                                                                                                                     
    df5.show(3, false)                                                                                                                    

  }                                                                                                                                       
}      
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...