У меня есть такие данные:
('2017-02-03', '22:57:00')
('2017-02-03', '23:02:00')
('2017-02-04', '09:56:00')
('2017-02-04', '10:01:00')
('2017-02-04', '10:06:00')
('2017-02-04', '10:11:00')
('2017-02-04', '10:16:00')
('2017-02-04', '10:21:00')
('2017-02-04', '10:26:00')
('2017-02-04', '10:31:00')
('2017-02-04', '10:36:00')
('2017-02-04', '16:57:00')
('2017-02-04', '17:12:00')
Что я хочу сделать, это сравнить время на каждой дате, чтобы увидеть, есть ли разница в 5 минут. Если разница в пять минут, я считаю, сколько их подряд. который даст такой результат:
('2017-02-03', '22:57:00') <- 1
('2017-02-03', '23:02:00') <- 2
('2017-02-04', '09:56:00') <- 1
('2017-02-04', '10:01:00') <- 2
('2017-02-04', '10:06:00') <- 3
('2017-02-04', '10:11:00') <- 4
('2017-02-04', '10:16:00') <- 5
('2017-02-04', '10:21:00') <- 6
('2017-02-04', '10:26:00') <- 7
('2017-02-04', '10:31:00') <- 8
('2017-02-04', '10:36:00') <- 9
('2017-02-04', '16:57:00') <- 1
('2017-02-04', '17:12:00') <- 1
в конце концов:
('2017-02-03', ('22:57:00', 2))
('2017-02-04', ('09:56:00', 9))
('2017-02-04', ('16:57:00', 1))
('2017-02-04', ('17:12:00', 1))
Пока это мой код
def check_interval(values, measurement):
start_date = ""
start_time = ""
counter = 1
res = ""
for index, val in enumerate(values):
if index + 1 == len(values):
break
date1, time1 = get_date_time(val)
date2, time2 = get_date_time(values[index + 1])
start_date = date1
if counter == 1:
start_time = time1
date_time1 = ' '.join(val)
date_time2 = ' '.join(values[index + 1])
time_diff = subtract_time(date_time1, date_time2)
if time_diff > timedelta(minutes=measurement):
res = start_date + "\t(" + start_time + ", " + str(counter) + ")\n"
print(res)
counter = 1
else:
counter += 1
if date1 != date2:
start_date = date2
# ------------------------------------------
# FUNCTION my_main
# ------------------------------------------
def my_main(sc, my_dataset_dir, station_name, measurement_time):
inputRDD = sc.textFile(my_dataset_dir)
stationRDD = inputRDD \
.map(process_line) \
.filter(lambda line: (line[0] == '0' and line[1] == station_name and line[5] == '0')) \
.map(lambda date_time: date_time[4]) \
.map(split_date_time) \
.sortByKey() \
.collect()
check_interval(stationRDD, measurement_time)
У меня есть результат, который я хочу, но я хочу знать, возможно ли добиться этого с помощью функции pyspark? и производим вывод:
('2017-02-03', ('22:57:00', 2))
('2017-02-04', ('09:56:00', 9))
('2017-02-04', ('16:57:00', 1))
('2017-02-04', ('17:12:00', 1))