Заполнение пустого поля в DataFrame с предыдущим значением поля - PullRequest
0 голосов
/ 19 марта 2019

Я работаю со Scala и Spark, и я относительно новичок в программировании на Scala, поэтому, возможно, у моего вопроса есть простое решение.

У меня есть один DataFrame, который хранит информацию об активных и деактивированных клиентах в некоторыхпродвижение.Этот DataFrame показывает Идентификатор клиента, действие, которое он / она предпринял (он может активировать или деактивировать в процессе продвижения в любое время) и Дата, когда он или она предприняло это действие.Вот пример этого формата:

Пример работы DataFrame
Example of how the DataFrame works

Я хочу ежедневный мониторинг клиентов, которые активны иЯ хочу посмотреть, как это число меняется в разные дни, но я не могу кодировать что-либо, что работает так.

Моя идея состояла в том, чтобы создать перекрестную комбинацию из двух Dataframes;один, который имеет только идентификаторы клиента, а другой - только даты, поэтому у меня были бы все даты, связанные со всеми идентификаторами клиентов, и мне нужно было только видеть статус клиента в каждой из дат (если клиент активен или неактивен),Поэтому после этого я сделал левое соединение этих новых Dataframe с DataFrame, который связывал идентификатор клиента и события, но в результате получилось много дат, имеющих статус «ноль», и я не знаю, как его заполнитьправильный статус.Вот пример:

Пример окончательного фрейма данных
Example of the final DataFrame

Я уже пытался использовать функцию задержки, но она не решила мою проблему,У кого-нибудь есть идеи, которые могут мне помочь?

Спасибо!

1 Ответ

0 голосов
/ 21 марта 2019

Немного дорогая операция из-за того, что Spark SQL имеет ограничения на коррелированные подзапросы с <, <=>,> =.

Начиная со второго фрейма данных с NULL и предполагая, что достаточно большая система и объем управляемых данных:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

// My sample input
val df  = Seq( 
  (1,"2018-03-12", "activate"),
  (1,"2018-03-13", null),
  (1,"2018-03-14", null),
  (1,"2018-03-15", "deactivate"),
  (1,"2018-03-16", null),
  (1,"2018-03-17", null),
  (1,"2018-03-18", "activate"), 
  (2,"2018-03-13", "activate"), 
  (2,"2018-03-14", "deactivate"), 
  (2,"2018-03-15", "activate") 
 ).toDF("ID", "dt", "act")
//df.show(false)

val w = Window.partitionBy("ID").orderBy(col("dt").asc)
val df2 = df.withColumn("rank", dense_rank().over(w)).select("ID", "dt","act", "rank") //.where("rank == 1")
//df2.show(false)

val df3 = df2.filter($"act".isNull)
//df3.show(false)

val df4 = df2.filter(!($"act".isNull)).toDF("ID2", "dt2", "act2", "rank2")
//df4.show(false)

val df5 = df3.join(df4, (df3("ID") === df4("ID2")) && (df4("rank2") < df3("rank")),"inner") 
//df5.show(false)

val w2 = Window.partitionBy("ID", "rank").orderBy(col("rank2").desc)
val df6 = df5.withColumn("rank_final", dense_rank().over(w2)).where("rank_final == 1").select("ID", "dt","act2").toDF("ID", "dt", "act") 
//df6.show

val df7 = df.filter(!($"act".isNull))

val dfFinal = df6.union(df7)
dfFinal.show(false)

возвращается:

+---+----------+----------+
|ID |dt        |act       |
+---+----------+----------+
|1  |2018-03-13|activate  |
|1  |2018-03-14|activate  |
|1  |2018-03-16|deactivate|
|1  |2018-03-17|deactivate|
|1  |2018-03-12|activate  |
|1  |2018-03-15|deactivate|
|1  |2018-03-18|activate  |
|2  |2018-03-13|activate  |
|2  |2018-03-14|deactivate|
|2  |2018-03-15|activate  |
+---+----------+----------+

Я решил это пошагово и в спешке, но не так очевидно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...