Используйте Window для подсчета строк с условием if в scala 2 - PullRequest
0 голосов
/ 07 декабря 2018

Я уже публиковал похожий вопрос, но кто-то дал мне хитрость, чтобы не использовать условие "if".

Здесь я нахожусь в подобной позиции, и я не нахожу никакой хитрости, чтобы избежать этого ....

У меня есть датафрейм.

var df = sc.parallelize(Array(
(1,  "2017-06-29 10:53:53.0","2017-06-25 14:60:53.0","boulanger.fr"), 
(2,  "2017-07-05 10:48:57.0","2017-09-05 08:60:53.0","patissier.fr"), 
(3,  "2017-06-28 10:31:42.0","2017-02-28 20:31:42.0","boulanger.fr"), 
(4,  "2017-08-21 17:31:12.0","2017-10-21 10:29:12.0","patissier.fr"), 
(5,  "2017-07-28 11:22:42.0","2017-05-28 11:22:42.0","boulanger.fr"), 
(6,  "2017-08-23 17:03:43.0","2017-07-23 09:03:43.0","patissier.fr"), 
(7,  "2017-08-24 16:08:07.0","2017-08-22 16:08:07.0","boulanger.fr"), 
(8,  "2017-08-31 17:20:43.0","2017-05-22 17:05:43.0","patissier.fr"), 
(9,  "2017-09-04 14:35:38.0","2017-07-04 07:30:25.0","boulanger.fr"), 
(10, "2017-09-07 15:10:34.0","2017-07-29 12:10:34.0","patissier.fr"))).toDF("id", "date1","date2", "mail")

df = df.withColumn("date1", (unix_timestamp($"date1", "yyyy-MM-dd HH:mm:ss").cast("timestamp")))
df = df.withColumn("date2", (unix_timestamp($"date2", "yyyy-MM-dd HH:mm:ss").cast("timestamp")))

df = df.orderBy("date1", "date2")

Это выглядит так:

+---+---------------------+---------------------+------------+
|id |date1                |date2                |mail        |
+---+---------------------+---------------------+------------+
|3  |2017-06-28 10:31:42.0|2017-02-28 20:31:42.0|boulanger.fr|
|1  |2017-06-29 10:53:53.0|2017-06-25 15:00:53.0|boulanger.fr|
|2  |2017-07-05 10:48:57.0|2017-09-05 09:00:53.0|patissier.fr|
|5  |2017-07-28 11:22:42.0|2017-05-28 11:22:42.0|boulanger.fr|
|4  |2017-08-21 17:31:12.0|2017-10-21 10:29:12.0|patissier.fr|
|6  |2017-08-23 17:03:43.0|2017-07-23 09:03:43.0|patissier.fr|
|7  |2017-08-24 16:08:07.0|2017-08-22 16:08:07.0|boulanger.fr|
|8  |2017-08-31 17:20:43.0|2017-05-22 17:05:43.0|patissier.fr|
|9  |2017-09-04 14:35:38.0|2017-07-04 07:30:25.0|boulanger.fr|
|10 |2017-09-07 15:10:34.0|2017-07-29 12:10:34.0|patissier.fr|
+---+---------------------+---------------------+------------+

Для каждого идентификатора я хочу посчитать среди всех других строк количество строк с:

  1. дата1 в [my_current_date1-60 день, my_current_date1-1 день]
  2. дата2
  3. то же письмо, что и мой current_mail

ЕслиЯ смотрю на строку 5 и хочу вернуть номер строки с:

  1. date1 в [2017-05-29 11: 22: 42.0, 2017-07-27 11:22: 42.0]
  2. date2 <2017-07-28 11: 22: 42.0 </li>
  3. mail = boulanger.fr

-> Результат будетбыть 2 (соответствует id 1 и id 3)

Так что я хотел бы сделать что-то вроде:

val w = Window.partitionBy("mail").orderBy(col("date1").cast("long")).rangeBetween(-60*24*60*60,-1*24*60*60)
var df= df.withColumn("all_previous", count("mail") over w)

Но это будет отвечать на условие 1 и условие 3, но не на второе... мне нужно что-то добавить, чтобы включить это второе условие сравнения date2 с my_date1 ...

1 Ответ

0 голосов
/ 07 декабря 2018

Используя обобщенную спецификацию Window с last(date1), являющимся текущим date1 на раздел окна, и sum над 0 и 1 в качестве условного подсчета, вот как я включил бы ваше условие # 2 в критерии подсчета:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

def days(n: Long): Long = n * 24 * 60 * 60

val w = Window.partitionBy("mail").orderBy($"date1".cast("long"))
val w1 = w.rangeBetween(days(-60), days(0))
val w2 = w.rangeBetween(days(-60), days(-1))

df.withColumn("all_previous", sum(
      when($"date2".cast("long") < last($"date1").over(w1).cast("long"), 1).
        otherwise(0)
    ).over(w2)
  ).na.fill(0).
  show
// +---+-------------------+-------------------+------------+------------+
// | id|              date1|              date2|        mail|all_previous|
// +---+-------------------+-------------------+------------+------------+
// |  3|2017-06-28 10:31:42|2017-02-28 20:31:42|boulanger.fr|           0|
// |  1|2017-06-29 10:53:53|2017-06-25 15:00:53|boulanger.fr|           1|
// |  5|2017-07-28 11:22:42|2017-05-28 11:22:42|boulanger.fr|           2|
// |  7|2017-08-24 16:08:07|2017-08-22 16:08:07|boulanger.fr|           3|
// |  9|2017-09-04 14:35:38|2017-07-04 07:30:25|boulanger.fr|           2|
// |  2|2017-07-05 10:48:57|2017-09-05 09:00:53|patissier.fr|           0|
// |  4|2017-08-21 17:31:12|2017-10-21 10:29:12|patissier.fr|           0|
// |  6|2017-08-23 17:03:43|2017-07-23 09:03:43|patissier.fr|           0|
// |  8|2017-08-31 17:20:43|2017-05-22 17:05:43|patissier.fr|           1|
// | 10|2017-09-07 15:10:34|2017-07-29 12:10:34|patissier.fr|           2|
// +---+-------------------+-------------------+------------+------------+

[ОБНОВЛЕНИЕ]

Это решение неверно, даже если результат, по-видимому, является правильным с образцом набора данных.В частности, last($"date1").over(w1) не сработало так, как задумывалось.Мы надеемся, что ответ будет служить руководством для рабочего решения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...