Как посчитать отдельный элемент по нескольким столбцам и покатому окну в PySpark - PullRequest
1 голос
/ 10 января 2020

Давайте представим, что у нас есть следующий фрейм данных:

port | flag | timestamp

---------------------------------------

20  | S    | 2009-04-24T17:13:14+00:00

30  | R    | 2009-04-24T17:14:14+00:00

32  | S    | 2009-04-24T17:15:14+00:00

21  | R    | 2009-04-24T17:16:14+00:00

54  | R    | 2009-04-24T17:17:14+00:00

24  | R    | 2009-04-24T17:18:14+00:00

Я хотел бы рассчитать количество различных port, flag за 3 часа в Pyspark.

Результатом будет что-то как:

port | flag | timestamp | distinct_port_flag_overs_3h

---------------------------------------

20   | S    | 2009-04-24T17:13:14+00:00 | 1

30   | R    | 2009-04-24T17:14:14+00:00 | 1

32   | S    | 2009-04-24T17:15:14+00:00 | 2

21   | R    | 2009-04-24T17:16:14+00:00 | 2

54   | R    | 2009-04-24T17:17:14+00:00 | 2

24   | R    | 2009-04-24T17:18:14+00:00 | 3

Запрос SQL выглядит следующим образом:

SELECT     
COUNT(DISTINCT port) OVER my_window AS distinct_port_flag_overs_3h
FROM my_table
WINDOW my_window AS (
    PARTITION BY flag
    ORDER BY CAST(timestamp AS timestamp)
    RANGE BETWEEN INTERVAL 3 HOUR PRECEDING AND CURRENT
)

Я нашел этот топи c, который решает проблему, но только если мы хотим для подсчета различных элементов в одном поле.

Есть ли у кого-нибудь представление о том, как этого добиться: *

pyspark 2.4.4

Ответы [ 2 ]

2 голосов
/ 10 января 2020

Просто соберите набор структур (port, flag) и получите его размер. Как то так:

w = Window.partitionBy("flag").orderBy("timestamp").rangeBetween(-10800, Window.currentRow)

df.withColumn("timestamp", to_timestamp("timestamp").cast("long"))\
  .withColumn("distinct_port_flag_overs_3h", size(collect_set(struct("port", "flag")).over(w)))\
  .orderBy(col("timestamp"))\
  .show()
0 голосов
/ 13 января 2020

Я только что написал код, который работает так:


def hive_time(time:str)->int:
    """
    Convert string time to number of seconds
    time : str : must be in the following format, numberType
    For exemple 1hour, 4day, 3month
    """
    match = re.match(r"([0-9]+)([a-z]+)", time, re.I)
    if match:
        items = match.groups()
        nb, kind = items[0], items[1]
        try :
            nb = int(nb)
        except ValueError as e:
            print(e,  traceback.format_exc())
            print("The format of {} which is your time aggregaation is not recognize. Please read the doc".format(time))

        if kind == "second":
            return nb
        if kind == "minute":
            return 60*nb
        if kind == "hour":
            return 3600*nb
        if kind == "day":
            return 24*3600*nb

    assert False, "The format of {} which is your time aggregaation is not recognize. \
    Please read the doc".format(time)


# Rolling window in spark
def distinct_count_over(data, window_size:str, out_column:str, *input_columns, time_column:str='timestamp'):
    """
    data : pyspark dataframe
    window_size : Size of the rolling window, check the doc for format information
    out_column : name of the column where you want to stock the results
    input_columns : the columns where you want to count distinct 
    time_column : the name of the columns where the timefield is stocked (must be in ISO8601)

    return : a new dataframe whith the stocked result 
    """

    concatenated_columns = F.concat(*input_columns)

    w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-hive_time(window_size), 0))

    return data \
.withColumn('timestampGMT', data.timestampGMT.cast(time_column)) \
.withColumn(out_column, F.size(F.collect_set(concatenated_columns).over(w)))

Работает хорошо, еще не проверял мониторинг производительности.

...