Используйте Window для подсчета строк с условием if в scala - PullRequest
0 голосов
/ 16 ноября 2018

Я надеюсь, что вы мне поможете: -)

У меня есть датафрейм с опубликованным объявлением.Я хочу, чтобы для каждого идентификатора рекламы было подсчитано количество объявлений, опубликованных за 2 месяца, предшествовавших этому, по одному и тому же адресу электронной почты.

Я создал нижеприведенный кадр данных, чтобы лучше объяснить:

var df = sc.parallelize(Array(
(1,  "2017-06-29 10:53:53.0","boulanger.fr" ,"2017-06-28","2017-04-29"), 
(2,  "2017-07-05 10:48:57.0","patissier.fr","2017-07-04","2017-05-05"), 
(3,  "2017-06-28 10:31:42.0","boulanger.fr" ,"2017-08-16","2017-06-17"), 
(4,  "2017-08-21 17:31:12.0","patissier.fr","2017-08-20","2017-06-21"), 
(5,  "2017-07-28 11:22:42.0","boulanger.fr" ,"2017-08-22","2017-06-23"), 
(6,  "2017-08-23 17:03:43.0","patissier.fr","2017-08-22","2017-06-23"), 
(7,  "2017-08-24 16:08:07.0","boulanger.fr" ,"2017-08-23","2017-06-24"), 
(8,  "2017-08-31 17:20:43.0","patissier.fr","2017-08-30","2017-06-30"), 
(9,  "2017-09-04 14:35:38.0","boulanger.fr" ,"2017-09-03","2017-07-04"), 
(10, "2017-09-07 15:10:34.0","patissier.fr","2017-09-06","2017-07-07"))).toDF("id_advert", "creation_date",
    "email", "date_minus1","date_minus2m")

df = df.withColumn("date_minus1", to_date(unix_timestamp($"date_minus1", "yyyy-MM-dd").cast("timestamp")))  
df = df.withColumn("date_minus2", to_date(unix_timestamp($"date_minus2", "yyyy-MM-dd").cast("timestamp"))) 
df = df.withColumn("date_crecreation", (unix_timestamp($"creation_date", "yyyy-MM-dd HH:mm:ss").cast("timestamp")))
  • date_minus1 = за день до публикации объявления
  • date_minus2m = за 2 месяца до размещения объявления

Я хочу подсчитать количество объявлений с помощьютот же адрес электронной почты, между этими двумя датами ...

В результате я хочу получить следующее:

+---------+----------------+
|id_advert|nb_prev_advert  |
+---------+----------------+
|6        |2               |
|3        |3               |
|5        |3               |
|9        |2               |
|4        |1               |
|8        |3               |
|7        |3               |
|10       |3               |
+--------+-----------------+

Мне удается сделать это с помощью отдельного соединения с фреймом данных, но ямиллионы строк потребовалось почти 2 часа для запуска ...

Я уверен, что мы можем сделать что-то вроде:

val w = Window.partitionBy("id_advert").orderBy("creation_date").rowsBetween(-50000000, -1)

И использовать его для перехода по фрейму данных и только для подсчетастрока с

  • адресом электронной почты для строки = адрес электронной почты для текущей строки
  • date_minus2m для строки <дата создания текущей строки <date_minus1 для строки </li>

Ответы [ 3 ]

0 голосов
/ 19 ноября 2018

Поскольку невозможно правильно структурировать комментарий, я буду использовать кнопку ответа, но на самом деле это скорее вопрос, чем ответ.

Я упрощаю задачу, думая, что с вашим ответом я смогу сделать то, чтоЯ хочу сделать, но я не уверен, что правильно понимаю ваш ответ ...

Как это работает?Для меня:

  • , если я сделаю .rangeBetween (-3, -1), я буду использовать окно, которое выглядит на 3 строки перед текущей строкой на одну строку перед текущей строкой.Но здесь кажется, что rangeBetween ссылается на переменную orderby, а не на общее количество строк. ???
  • , если я делаю "partitionBy (col (" email "))", у меня должна быть одна строкапо электронной почте, но здесь я все еще получаю oneline от advert_id ...

Что я действительно хочу сделать, так это подсчитать, соответственно, количество проданных предметов и количество не проданных предметов за 2 месяца додата публикации объявления по тому же адресу.

Это простой способ использовать то, что вы сделали, и применить его к моей реальной проблеме?

Мой фрейм данных выглядит так:

var df = sc.parallelize(Array(
(1,  "2015-06-29 10:53:53.0","boulanger.fr", 1),
(2,  "2016-07-05 10:48:57.0","patissier.fr", 1),
(3,  "2017-06-28 10:31:42.0","boulanger.fr", 1),
(4,  "2017-08-21 17:31:12.0","patissier.fr", 0),
(5,  "2015-07-28 11:22:42.0","boulanger.fr", 0),
(6,  "2017-08-23 17:03:43.0","patissier.fr", 0),
(7,  "2017-08-24 16:08:07.0","boulanger.fr", 1),
(8,  "2014-08-31 17:20:43.0","patissier.fr", 1),
(9,  "2017-09-04 14:35:38.0","boulanger.fr", 1),
(10, "2012-09-07 15:10:34.0","patissier.fr", 0))).toDF("id_advert", "creation_date","email", "sold")

Для каждого id_advert мне бы хотелось иметь 2 строки.Один за количество проданных товаров и один за количество не проданных товаров ...

Заранее спасибо !!!Если вы не сможете ответить, я сделаю это более поспешным образом; -).

0 голосов
/ 19 ноября 2018

Добавление этого в качестве другого ответа, потому что он отличается

Ввод:

df.select("*").orderBy("email","creation_date").show()

+---------+--------------------+------------+----+
|id_advert|       creation_date|       email|sold|
+---------+--------------------+------------+----+
|        1|2015-06-29 10:53:...|boulanger.fr|   1|
|        5|2015-07-28 11:22:...|boulanger.fr|   0|
|        3|2017-06-28 10:31:...|boulanger.fr|   1|
|        7|2017-08-24 16:08:...|boulanger.fr|   1|
|        9|2017-09-04 14:35:...|boulanger.fr|   1|
|       10|2012-09-07 15:10:...|patissier.fr|   0|
|        8|2014-08-31 17:20:...|patissier.fr|   1|
|        2|2016-07-05 10:48:...|patissier.fr|   1|
|        4|2017-08-21 17:31:...|patissier.fr|   0|
|        6|2017-08-23 17:03:...|patissier.fr|   0|
+---------+--------------------+------------+----+

Теперь вы определяете свою спецификацию окна как что-то вроде

val w = Window.
          partitionBy("email").
          orderBy(col("creation_date"). 
          cast("timestamp").
          cast("long")).rangeBetween(-60*24*60*60,-1)

Иосновной запрос будет:

df.
  select(
      col("*"),count("email").over(w).alias("all_prev_mail_advert"), 
      sum("sold").over(w).alias("all_prev_sold_mail_advert")
  ).orderBy("email","creation_date").show()

Вывод:

+---------+--------------------+------------+----+--------------------+-------------------------+
|id_advert|       creation_date|       email|sold|all_prev_mail_advert|all_prev_sold_mail_advert|
+---------+--------------------+------------+----+--------------------+-------------------------+
|        1|2015-06-29 10:53:...|boulanger.fr|   1|                   0|                     null|
|        5|2015-07-28 11:22:...|boulanger.fr|   0|                   1|                        1|
|        3|2017-06-28 10:31:...|boulanger.fr|   1|                   0|                     null|
|        7|2017-08-24 16:08:...|boulanger.fr|   1|                   1|                        1|
|        9|2017-09-04 14:35:...|boulanger.fr|   1|                   1|                        1|
|       10|2012-09-07 15:10:...|patissier.fr|   0|                   0|                     null|
|        8|2014-08-31 17:20:...|patissier.fr|   1|                   0|                     null|
|        2|2016-07-05 10:48:...|patissier.fr|   1|                   0|                     null|
|        4|2017-08-21 17:31:...|patissier.fr|   0|                   0|                     null|
|        6|2017-08-23 17:03:...|patissier.fr|   0|                   1|                        0|
+---------+--------------------+------------+----+--------------------+-------------------------+

Объяснение:

Мы определяем оконную функцию за последние два месяца, разделенную по электронной почте.И подсчет в этом окне дает все предыдущие объявления для того же электронного письма.

И чтобы получить все предыдущие проданные объявления, мы просто добавляем столбец проданных в том же окне.Поскольку продано 1 для проданного предмета, сумма показывает количество проданных предметов в этом окне.

0 голосов
/ 18 ноября 2018

Вот ответ с использованием Window с диапазоном

Создайте спецификацию окна с диапазоном между текущим и прошлыми шестидесяти днями

val w = Window
          .partitionBy(col("email"))
          .orderBy(col("creation_date").cast("timestamp").cast("long"))
          .rangeBetween(-60*86400,-1)

Затем выберите его в вашем фрейме данных

df
 .select(col("*"),count("email").over(w).alias("trailing_count"))
 .orderBy("email","creation_date") //using this for display purpose 
 .show()

Примечание: ваш ожидаемый результат может быть неправильным.Во-первых, для рекламы должен быть хотя бы ноль, потому что что-то должно быть в начале строки для почты.Кроме того, рассчитывать на рекламу 3 кажется неправильным.

Входные данные:

df.select("id_advert","creation_date","email").orderBy("email", "creation_date").show()

+---------+--------------------+------------+
|id_advert|       creation_date|       email|
+---------+--------------------+------------+
|        3|2017-06-28 10:31:...|boulanger.fr|
|        1|2017-06-29 10:53:...|boulanger.fr|
|        5|2017-07-28 11:22:...|boulanger.fr|
|        7|2017-08-24 16:08:...|boulanger.fr|
|        9|2017-09-04 14:35:...|boulanger.fr|
|        2|2017-07-05 10:48:...|patissier.fr|
|        4|2017-08-21 17:31:...|patissier.fr|
|        6|2017-08-23 17:03:...|patissier.fr|
|        8|2017-08-31 17:20:...|patissier.fr|
|       10|2017-09-07 15:10:...|patissier.fr|
+---------+--------------------+------------+

Вывод:

+---------+--------------------+------------+-------------+--------------+
|id_advert|       creation_date|       email|date_creation|trailing_count|
+---------+--------------------+------------+-------------+--------------+
|        3|2017-06-28 10:31:...|boulanger.fr|   1498645902|             0|
|        1|2017-06-29 10:53:...|boulanger.fr|   1498733633|             1|
|        5|2017-07-28 11:22:...|boulanger.fr|   1501240962|             2|
|        7|2017-08-24 16:08:...|boulanger.fr|   1503590887|             3|
|        9|2017-09-04 14:35:...|boulanger.fr|   1504535738|             2|
|        2|2017-07-05 10:48:...|patissier.fr|   1499251737|             0|
|        4|2017-08-21 17:31:...|patissier.fr|   1503336672|             1|
|        6|2017-08-23 17:03:...|patissier.fr|   1503507823|             2|
|        8|2017-08-31 17:20:...|patissier.fr|   1504200043|             3|
|       10|2017-09-07 15:10:...|patissier.fr|   1504797034|             3|
+---------+--------------------+------------+-------------+--------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...