Идея состоит в том, чтобы разделить данные по электронной почте, отсортировать в каждом разделе по дате и времени, а затем сопоставить каждый раздел с желаемым результатом. Этот подход будет работать, если данные для каждого раздела (= данные для одного адреса электронной почты) помещаются в память одного исполнителя Spark.
Актуальная логика Spark выполняет шаги
- Создатьновый столбец, который содержит метки времени
- Разделение данных по электронной почте, чтобы все строки с одним и тем же адресом электронной почты были частью одного раздела. Обратите внимание, что в одном разделе могут находиться данные из нескольких писем.
- Сортировать каждый раздел по электронной почте и отметке времени.
- Обработать каждый раздел. Если необходимо, создайте несколько выходов для каждого раздела, как требуется
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from datetime import datetime, timedelta
spark = SparkSession.builder.appName("test").getOrCreate()
df = spark.read.option("header", "true").csv(<path>) #or any other data source
df = df.withColumn("date_time", to_timestamp(concat(col("Date_of_purchase"), lit(" "), col("time_of_purchase")), "MM/dd/yy hh:mm aa")) \
.drop("Date_of_purchase", "time_of_purchase") \
.repartition(col("email")) \
.sortWithinPartitions(col("email"), col("date_time"))
def process_partition(df_chunk):
row_list = list(df_chunk)
if len(row_list) == 0:
return
email = row_list[0]['email']
start = row_list[0]['date_time']
end = start + timedelta(hours=4)
count = 0
for row in row_list:
if email == row['email'] and end > row['date_time']:
count = count +1
else:
yield Row(email, start, end, count)
email = row['email']
start = row['date_time']
end = start + timedelta(hours=4)
count = 1
yield Row(email, start, end, count)
result = df.rdd.mapPartitions(process_partition).toDF(["email", "from", "to", "count"])
result.show()
Выход:
+-------------+-------------------+-------------------+-----+
| email| from| to|count|
+-------------+-------------------+-------------------+-----+
|def@gmail.com|2018-11-10 12:17:00|2018-11-10 16:17:00| 2|
|def@gmail.com|2018-11-10 20:16:00|2018-11-11 00:16:00| 3|
|abc@gmail.com|2018-11-10 12:10:00|2018-11-10 16:10:00| 3|
|abc@gmail.com|2018-11-11 06:16:00|2018-11-11 10:16:00| 2|
+-------------+-------------------+-------------------+-----+
Чтобы изменить длину периода, timedelta
s можно установить на любойзначение.