Как получить данные второго фрейма данных для всех значений отдельных столбцов, значения которых совпадают в первом фрейме данных? - PullRequest
0 голосов
/ 05 апреля 2019

Имеют два кадра данных, как показано ниже

first_df
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- min_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

second_df 
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

У меня есть данные некоторых компаний в second_df. Мне нужно получить данные из second_df для тех идентификаторов компаний, которые перечислены в first_df.

что за свеча apis полезна для меня? Как я могу это сделать?

Спасибо.

Расширение вопроса:

Если нет сохраненных записей, first_df будет пустым. Следовательно first_df ("mean") & first_df ("count") будет нулевым, в результате "acc_new_mean" будет нулевым. В этом случае мне нужно установить "new_mean" как second_df ("среднее"), как это сделать? Я пытался так, но это не работает Любая подсказка, как обрабатывать здесь .withColumn ("new_mean", ...) ???

val acc_new_mean = (second_df("mean") + first_df("mean")) / (second_df("count") + first_df("count"))
    val acc_new_count  =  second_df("count") + first_df("count")


    val new_df = second_df.join(first_df.withColumnRenamed("company_id", "right_company_id").as("a"), 
                                 (  $"a.right_company_id"  === second_df("company_id") && ( second_df("min_dd")  > $"a.max_dd" ) ) 
                            , "leftOuter")
                            .withColumn("new_mean", if(acc_new_mean == null) lit(second_df("mean")) else  acc_new_mean )

Ответы [ 2 ]

2 голосов
/ 05 апреля 2019

ПОДХОД 1:

Если вам трудно соединить 2 фрейма данных с помощью API соединения фрейма, вы можете использовать sql, если вам удобно в sql. Для этого вы можете зарегистрировать 2 своих кадра данных в виде таблиц в искровой памяти и записать sql поверх этого.

second_df.registerTempTable("table_second_df")
first_df.registerTempTable("table_first_df")

val new_df = spark.sql("select distinct s.* from table_second_df s join table_first_df f on s.company_id=f.company_id")
new_df.show()

Как вы и просили, я добавил логику.

Считайте, что ваш first_df выглядит следующим образом:

+----------+----------+----------+----+-----+
|company_id|    max_dd|    min_dd|mean|count|
+----------+----------+----------+----+-----+
|         A|2019-04-05|2019-04-01|  10|  100|
|         A|2019-04-06|2019-04-02|  20|  200|
|         B|2019-04-08|2019-04-01|  30|  300|
|         B|2019-04-09|2019-04-02|  40|  400|
+----------+----------+----------+----+-----+

Считайте, что ваш second_df выглядит следующим образом:

+----------+----------+----+-----+
|company_id|    max_dd|mean|count|
+----------+----------+----+-----+
|         A|2019-04-03|  10|  100|
|         A|2019-04-02|  20|  200|
+----------+----------+----+-----+

Поскольку во второй таблице указан идентификатор компании A, я взял самую последнюю запись max_dd из second_df. Для идентификатора компании B его нет в second_df Я взял самую последнюю запись max_dd из first_df.

Пожалуйста, найдите код ниже.

first_df.registerTempTable("table_first_df")
second_df.registerTempTable("table_second_df")
val new_df = spark.sql("select company_id,max_dd,min_dd,mean,count from (select distinct s.company_id,s.max_dd,null as min_dd,s.mean,s.count,row_number() over (partition by s.company_id order by s.max_dd desc) rno from table_second_df s join table_first_df f on s.company_id=f.company_id) where rno=1 union select company_id,max_dd,min_dd,mean,count from (select distinct f.*,row_number() over (partition by f.company_id order by f.max_dd desc) rno from table_first_df f left join table_second_df s  on s.company_id=f.company_id where s.company_id is null) where rno=1")
new_df.show()

Ниже приведен результат:

enter image description here

ПОДХОД 2:

Вместо создания временной таблицы, как я упоминал в Approach 1, вы можете использовать join из dataframe's API. Это та же логика в Approach 1, но здесь я использую dataframe's API для достижения этой цели. Пожалуйста, не забудьте импортировать org.apache.spark.sql.expressions.Window, поскольку я использовал Window.patitionBy в приведенном ниже коде.

val new_df = second_df.as('s).join(first_df.as('f),$"s.company_id" === $"f.company_id","inner").drop($"min_dd").withColumn("min_dd",lit("")).select($"s.company_id", $"s.max_dd",$"min_dd", $"s.mean", $"s.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"s.company_id").orderBy($"s.max_dd".desc))).filter($"Rno" === 1).drop($"Rno").union(first_df.as('f).join(second_df.as('s),$"s.company_id" === $"f.company_id","left_anti").select($"f.company_id", $"f.max_dd",$"f.min_dd", $"f.mean", $"f.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"f.company_id").orderBy($"f.max_dd".desc))).filter($"Rno" === 1).drop($"Rno"))
new_df.show()

Ниже приведен результат:

enter image description here

Пожалуйста, дайте мне знать, если у вас есть какие-либо вопросы.

1 голос
/ 12 апреля 2019
 val acc_new_mean = //new mean literaal
 val acc_new_count  =   //new count literaal


          val resultDf = computed_df.join(accumulated_results_df.as("a"), 
                             (  $"company_id"  === computed_df("company_id")  ) 
                        , "leftOuter")
                        .withColumn("new_mean", when( acc_new_mean.isNull,lit(computed_df("mean")) ).otherwise(acc_new_mean) )
                        .withColumn("new_count", when( acc_new_count.isNull,lit(computed_df("count")) ).otherwise(acc_new_count) )
                         .select(
                            computed_df("company_id"),
                            computed_df("max_dd"),
                            col("new_mean").as("mean"),
                            col("new_count").as("count")
                          )
...