PySpark: Как сгруппировать по фиксированному диапазону дат и другому столбцу, вычисляя сумму столбца значений с помощью оконных функций? - PullRequest
0 голосов
/ 27 марта 2019

У меня есть Spark DataFrame, состоящий из трех столбцов: Date, Item и Value типов Date, String и Double соответственно.Я хотел бы сгруппировать по диапазону дат (где продолжительность каждого диапазона составляет 7 дней, начиная с первой даты в кадре данных и выше) и Item, и вычислить суммы значений для каждой такой группы, определенной диапазоном дат (фактически, номером недели) и Item.

Я подозреваю, что в какой-то момент здесь нужно использовать функции Window PySpark для диапазонов дат, но я не могу понять, как их реализовать в этом случае.

1 Ответ

0 голосов
/ 01 апреля 2019

Давайте сначала определим подход для этого -

(a) Добавить столбец week_start_date для строки (каждая дата)

(b) Использовать столбец week_start_date в группе по (вместе с 'item')и вычислите сумму «value»

Генерируйте некоторые тестовые данные

from pyspark.sql.types import *

schema = StructType([StructField('date', StringType(),True),
                     StructField('item', StringType(),True),
                     StructField('value', DoubleType(),True)
    ]
    )

data = [('2019-01-01','I1',1.1),
        ('2019-01-02','I1',1.1),
        ('2019-01-10','I1',1.1),
        ('2019-01-10','I2',1.1),
        ('2019-01-11','I2',1.1),
        ('2019-01-11','I3',1.1)]

df = spark.createDataFrame(data, schema)

Python-функция для генерации week_start_date

from datetime import datetime, timedelta

def week_start_date(day):
    dt = datetime.strptime(day, '%Y-%m-%d')
    start = dt - timedelta(days=dt.weekday())
    end = start + timedelta(days=6)
    return start.strftime('%Y-%m-%d')

spark.udf.register('week_start_date',week_start_date)

Используйте функцию, чтобы сгенерировать week_start_date, а затем сгруппировать по week_start_date и элементу

 df.selectExpr("week_start_date(date) as start_date","date","item as item","value as value" ).\
        groupBy("start_date","item").\
        agg(sum('value').alias('value_sum')).\
        orderBy("start_date").\
        show()
...