Pyspark - Как объединить за 4 часа окна в группы - PullRequest
4 голосов
/ 03 октября 2019

У меня есть следующий набор данных:

id  email   Date_of_purchase    time_of_purchase
1   abc@gmail.com   11/10/18    12:10 PM
2   abc@gmail.com   11/10/18    02:11 PM
3   abc@gmail.com   11/10/18    03:14 PM
4   abc@gmail.com   11/11/18    06:16 AM
5   abc@gmail.com   11/11/18    09:10 AM
6   def@gmail.com   11/10/18    12:17 PM
7   def@gmail.com   11/10/18    03:24 PM
8   def@gmail.com   11/10/18    08:16 PM
9   def@gmail.com   11/10/18    09:13 PM
10  def@gmail.com   11/11/18    12:01 AM

Я хочу рассчитать количество транзакций, выполненных каждым идентификатором электронной почты в течение 4 часов. Например, идентификаторы электронной почты: abc@gmail.com осуществили 3 транзакции, начиная с 11.10.18-12 до 18.10.16, 16.10 и осуществили 2 транзакции, начиная с 11.11.18.66, 11.11.18. 10.16 утра. идентификаторы электронной почты: def@gmail.com совершил 2 транзакции, начиная с 11.11.18.12 до 18.11.16 и 17.47, и совершил 3 транзакции, начиная с 10.11.18.16 до 11.11.18.16.

Мой желаемый результат:

 email          hour_interval                           purchase_in_4_hours
abc@gmail.com   [11/10/18 12.10 PM to 11/10/18 4.10 PM] 3
abc@gmail.com   [11/11/18 6.16 AM to 11/11/18 10.16 AM] 2
def@gmail.com   [11/10/18 12.17 PM to 11/10/18 4.17 PM] 2
def@gmail.com   [11/10/18 8.16 PM to 11/11/18 12.16 AM] 3

В моем наборе данных содержится 1000 000 строк. Я очень новичок в искре. Любая помощь будет высоко оценена. PS Временной интервал может изменяться от 4 часов до 1 часа, 6 часов, 1 дня и т. Д.

TIA.

1 Ответ

5 голосов
/ 12 октября 2019

Идея состоит в том, чтобы разделить данные по электронной почте, отсортировать в каждом разделе по дате и времени, а затем сопоставить каждый раздел с желаемым результатом. Этот подход будет работать, если данные для каждого раздела (= данные для одного адреса электронной почты) помещаются в память одного исполнителя Spark.

Актуальная логика Spark выполняет шаги

  1. Создатьновый столбец, который содержит метки времени
  2. Разделение данных по электронной почте, чтобы все строки с одним и тем же адресом электронной почты были частью одного раздела. Обратите внимание, что в одном разделе могут находиться данные из нескольких писем.
  3. Сортировать каждый раздел по электронной почте и отметке времени.
  4. Обработать каждый раздел. Если необходимо, создайте несколько выходов для каждого раздела, как требуется
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("test").getOrCreate()
df = spark.read.option("header", "true").csv(<path>) #or any other data source
df = df.withColumn("date_time", to_timestamp(concat(col("Date_of_purchase"), lit(" "), col("time_of_purchase")), "MM/dd/yy hh:mm aa")) \
    .drop("Date_of_purchase", "time_of_purchase") \
    .repartition(col("email")) \
    .sortWithinPartitions(col("email"), col("date_time"))

def process_partition(df_chunk):
    row_list = list(df_chunk)
    if len(row_list) == 0:
        return
    email = row_list[0]['email']
    start = row_list[0]['date_time']
    end = start + timedelta(hours=4)
    count = 0
    for row in row_list:
        if email == row['email'] and end > row['date_time']:
            count = count +1
        else:
            yield Row(email, start, end, count)
            email = row['email']
            start = row['date_time']
            end = start + timedelta(hours=4)
            count = 1
    yield Row(email, start, end, count)

result = df.rdd.mapPartitions(process_partition).toDF(["email", "from", "to", "count"])
result.show()

Выход:

+-------------+-------------------+-------------------+-----+
|        email|               from|                 to|count|
+-------------+-------------------+-------------------+-----+
|def@gmail.com|2018-11-10 12:17:00|2018-11-10 16:17:00|    2|
|def@gmail.com|2018-11-10 20:16:00|2018-11-11 00:16:00|    3|
|abc@gmail.com|2018-11-10 12:10:00|2018-11-10 16:10:00|    3|
|abc@gmail.com|2018-11-11 06:16:00|2018-11-11 10:16:00|    2|
+-------------+-------------------+-------------------+-----+

Чтобы изменить длину периода, timedelta s можно установить на любойзначение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...