Определение повторяющихся значений столбца над окном (Scala) - PullRequest
2 голосов
/ 21 марта 2019

У меня есть фрейм данных с двумя столбцами: «ID» и «Amount», каждая строка представляет транзакцию с определенным идентификатором и суммой транзакции. В моем примере используется следующий DF:

val df = sc.parallelize(Seq((1, 120),(1, 120),(2, 40),
  (2, 50),(1, 30),(2, 120))).toDF("ID","Amount")

Я хочу создать новый столбец, определяющий, является ли указанная сумма повторяющимся значением, т. Е. Происходит в любой другой транзакции с тем же идентификатором или нет.

Я нашел способ сделать это более широко, то есть по всему столбцу «Сумма», без учета идентификатора, используя следующую функцию:

def recurring_amounts(df: DataFrame, col: String) : DataFrame = {
  var df_to_arr = df.select(col).rdd.map(r => r(0).asInstanceOf[Double]).collect()
  var arr_to_map = df_to_arr.groupBy(identity).mapValues(_.size)
  var map_to_df = arr_to_map.toSeq.toDF(col, "Count")
  var df_reformat = map_to_df.withColumn("Amount", $"Amount".cast(DoubleType))
  var df_out = df.join(df_reformat, Seq("Amount"))
  return df_new
}

val df_output = recurring_amounts(df, "Amount")

Возвращает:

+---+------+-----+
|ID |Amount|Count|
+---+------+-----+
| 1 | 120  |  3  |
| 1 | 120  |  3  |
| 2 |  40  |  1  |
| 2 |  50  |  1  | 
| 1 |  30  |  1  |
| 2 | 120  |  3  |
+---+------+-----+

, который я затем могу использовать для создания желаемой двоичной переменной, чтобы указать, является ли сумма повторяющейся или нет (да, если> 1, иначе нет).

Однако моя проблема в этом примере иллюстрируется значением 120, которое повторяется для идентификатора 1, но не для идентификатора 2. Поэтому мой желаемый результат:

 +---+------+-----+
|ID |Amount|Count|
+---+------+-----+
| 1 | 120  |  2  |
| 1 | 120  |  2  |
| 2 |  40  |  1  |
| 2 |  50  |  1  | 
| 1 |  30  |  1  |
| 2 | 120  |  1  |
+---+------+-----+

Я пытался придумать, как применить функцию, используя .over(Window.partitionBy("ID") но не уверен, как это сделать. Любые советы будут высоко оценены.

1 Ответ

0 голосов
/ 23 марта 2019

Если вы хороши в SQL, вы можете написать SQL запрос для вашего Dataframe. Первое, что вам нужно сделать, это зарегистрировать ваш Dataframe как таблицу в памяти искры. После этого вы можете написать SQL в верхней части таблицы. Обратите внимание, что spark является переменной сеанса искры.

val df = sc.parallelize(Seq((1, 120),(1, 120),(2, 40),(2, 50),(1, 30),(2, 120))).toDF("ID","Amount")
df.registerTempTable("transactions")
spark.sql("select *,count(*) over(partition by ID,Amount) as Count from transactions").show()

enter image description here

Пожалуйста, дайте мне знать, если у вас есть какие-либо вопросы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...