Писпарк 5 минут скользящая сумма окон - PullRequest
0 голосов
/ 05 мая 2019

У меня есть такие данные:

('2017-02-03', '22:57:00')
('2017-02-03', '23:02:00')
('2017-02-04', '09:56:00')
('2017-02-04', '10:01:00')
('2017-02-04', '10:06:00')
('2017-02-04', '10:11:00')
('2017-02-04', '10:16:00')
('2017-02-04', '10:21:00')
('2017-02-04', '10:26:00')
('2017-02-04', '10:31:00')
('2017-02-04', '10:36:00')
('2017-02-04', '16:57:00')
('2017-02-04', '17:12:00')

Что я хочу сделать, это сравнить время на каждой дате, чтобы увидеть, есть ли разница в 5 минут. Если разница в пять минут, я считаю, сколько их подряд. который даст такой результат:

('2017-02-03', '22:57:00') <- 1
('2017-02-03', '23:02:00') <- 2

('2017-02-04', '09:56:00') <- 1
('2017-02-04', '10:01:00') <- 2
('2017-02-04', '10:06:00') <- 3
('2017-02-04', '10:11:00') <- 4
('2017-02-04', '10:16:00') <- 5
('2017-02-04', '10:21:00') <- 6
('2017-02-04', '10:26:00') <- 7
('2017-02-04', '10:31:00') <- 8
('2017-02-04', '10:36:00') <- 9

('2017-02-04', '16:57:00') <- 1
('2017-02-04', '17:12:00') <- 1

в конце концов:

('2017-02-03', ('22:57:00', 2))
('2017-02-04', ('09:56:00', 9))
('2017-02-04', ('16:57:00', 1))
('2017-02-04', ('17:12:00', 1))

Пока это мой код

def check_interval(values, measurement):
    start_date = ""
    start_time = ""
    counter = 1
    res = ""

    for index, val in enumerate(values):
        if index + 1 == len(values):
            break

        date1, time1 = get_date_time(val)
        date2, time2 = get_date_time(values[index + 1])

        start_date = date1

        if counter == 1:
            start_time = time1

        date_time1 = ' '.join(val)
        date_time2 = ' '.join(values[index + 1])

        time_diff = subtract_time(date_time1, date_time2)

        if time_diff > timedelta(minutes=measurement):
            res = start_date + "\t(" + start_time + ", " + str(counter) + ")\n"
            print(res)
            counter = 1
        else:
            counter += 1

        if date1 != date2:
            start_date = date2


# ------------------------------------------
# FUNCTION my_main
# ------------------------------------------
def my_main(sc, my_dataset_dir, station_name, measurement_time):
   inputRDD = sc.textFile(my_dataset_dir)

   stationRDD = inputRDD \
        .map(process_line) \
        .filter(lambda line: (line[0] == '0' and line[1] == station_name and line[5] == '0')) \
        .map(lambda date_time: date_time[4]) \
        .map(split_date_time) \
        .sortByKey() \
        .collect()

    check_interval(stationRDD, measurement_time)

У меня есть результат, который я хочу, но я хочу знать, возможно ли добиться этого с помощью функции pyspark? и производим вывод:

('2017-02-03', ('22:57:00', 2))
('2017-02-04', ('09:56:00', 9))
('2017-02-04', ('16:57:00', 1))
('2017-02-04', ('17:12:00', 1))

1 Ответ

2 голосов
/ 05 мая 2019

API данных можно использовать с функциями window:

import pyspark.sql.functions as psf
from pyspark.sql import Window

w = Window.orderBy('datetime')
df \
    .withColumn('datetime', psf.unix_timestamp(psf.concat('date', psf.lit(' '), 'time').cast('timestamp'))) \
    .withColumn('5min_delta', (psf.col('datetime') - psf.lag('datetime').over(w)) / 60 > 5) \
    .fillna(True) \
    .withColumn('group_id', psf.sum(psf.col('5min_delta').cast('int')).over(w)).show()

        +----------+--------+----------+----------+--------+
        |      date|    time|  datetime|5min_delta|group_id|
        +----------+--------+----------+----------+--------+
        |2017-02-03|22:57:00|1486159020|      true|       1|
        |2017-02-03|23:02:00|1486159320|     false|       1|
        |2017-02-04|09:56:00|1486198560|      true|       2|
        |2017-02-04|10:01:00|1486198860|     false|       2|
        |2017-02-04|10:06:00|1486199160|     false|       2|
        |2017-02-04|10:11:00|1486199460|     false|       2|
        |2017-02-04|10:16:00|1486199760|     false|       2|
        |2017-02-04|10:21:00|1486200060|     false|       2|
        |2017-02-04|10:26:00|1486200360|     false|       2|
        |2017-02-04|10:31:00|1486200660|     false|       2|
        |2017-02-04|10:36:00|1486200960|     false|       2|
        |2017-02-04|16:57:00|1486223820|      true|       3|
        |2017-02-04|17:12:00|1486224720|      true|       4|
        +----------+--------+----------+----------+--------+
  • Первая функция окна заключается в вычислении разницы времени в минутах между двумя последовательными отметками времени.
  • Второй, позволяет нам создать уникальный идентификатор группы путем вычисления кумулятивной суммы.Он будет увеличиваться на единицу каждый раз, когда разрыв больше 5 минут.

Затем вы можете подсчитать количество элементов в каждой группе

df \
    .groupBy('group_id') \
    .agg(psf.first('date').alias('date'), psf.count('*').alias('nb')) \
    .show()

        +--------+----------+---+
        |group_id|      date| nb|
        +--------+----------+---+
        |       1|2017-02-03|  2|
        |       2|2017-02-04|  9|
        |       3|2017-02-04|  1|
        |       4|2017-02-04|  1|
        +--------+----------+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...