ПОДХОД 1:
Если вам трудно соединить 2 фрейма данных с помощью API соединения фрейма, вы можете использовать sql, если вам удобно в sql. Для этого вы можете зарегистрировать 2 своих кадра данных в виде таблиц в искровой памяти и записать sql поверх этого.
second_df.registerTempTable("table_second_df")
first_df.registerTempTable("table_first_df")
val new_df = spark.sql("select distinct s.* from table_second_df s join table_first_df f on s.company_id=f.company_id")
new_df.show()
Как вы и просили, я добавил логику.
Считайте, что ваш first_df
выглядит следующим образом:
+----------+----------+----------+----+-----+
|company_id| max_dd| min_dd|mean|count|
+----------+----------+----------+----+-----+
| A|2019-04-05|2019-04-01| 10| 100|
| A|2019-04-06|2019-04-02| 20| 200|
| B|2019-04-08|2019-04-01| 30| 300|
| B|2019-04-09|2019-04-02| 40| 400|
+----------+----------+----------+----+-----+
Считайте, что ваш second_df
выглядит следующим образом:
+----------+----------+----+-----+
|company_id| max_dd|mean|count|
+----------+----------+----+-----+
| A|2019-04-03| 10| 100|
| A|2019-04-02| 20| 200|
+----------+----------+----+-----+
Поскольку во второй таблице указан идентификатор компании A
, я взял самую последнюю запись max_dd
из second_df
. Для идентификатора компании B
его нет в second_df
Я взял самую последнюю запись max_dd
из first_df
.
Пожалуйста, найдите код ниже.
first_df.registerTempTable("table_first_df")
second_df.registerTempTable("table_second_df")
val new_df = spark.sql("select company_id,max_dd,min_dd,mean,count from (select distinct s.company_id,s.max_dd,null as min_dd,s.mean,s.count,row_number() over (partition by s.company_id order by s.max_dd desc) rno from table_second_df s join table_first_df f on s.company_id=f.company_id) where rno=1 union select company_id,max_dd,min_dd,mean,count from (select distinct f.*,row_number() over (partition by f.company_id order by f.max_dd desc) rno from table_first_df f left join table_second_df s on s.company_id=f.company_id where s.company_id is null) where rno=1")
new_df.show()
Ниже приведен результат:
ПОДХОД 2:
Вместо создания временной таблицы, как я упоминал в Approach 1
, вы можете использовать join
из dataframe's
API. Это та же логика в Approach 1
, но здесь я использую dataframe's
API для достижения этой цели. Пожалуйста, не забудьте импортировать org.apache.spark.sql.expressions.Window
, поскольку я использовал Window.patitionBy
в приведенном ниже коде.
val new_df = second_df.as('s).join(first_df.as('f),$"s.company_id" === $"f.company_id","inner").drop($"min_dd").withColumn("min_dd",lit("")).select($"s.company_id", $"s.max_dd",$"min_dd", $"s.mean", $"s.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"s.company_id").orderBy($"s.max_dd".desc))).filter($"Rno" === 1).drop($"Rno").union(first_df.as('f).join(second_df.as('s),$"s.company_id" === $"f.company_id","left_anti").select($"f.company_id", $"f.max_dd",$"f.min_dd", $"f.mean", $"f.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"f.company_id").orderBy($"f.max_dd".desc))).filter($"Rno" === 1).drop($"Rno"))
new_df.show()
Ниже приведен результат:
Пожалуйста, дайте мне знать, если у вас есть какие-либо вопросы.