У меня есть эти данные
+-----------------+----------------------+
| customer | date_time |
+-----------------+----------------------+
| A | 2020-4-10 12:10:00 |
+-----------------+----------------------+
| B | 2020-4-10 12:20:00 |
+-----------------+----------------------+
| A | 2020-4-10 12:20:00 |
+-----------------+----------------------+
| B | 2020-4-10 12:25:00 |
+-----------------+----------------------+
| B | 2020-4-10 12:40:00 |
+-----------------+----------------------+
| A | 2020-4-10 12:50:00 |
+-----------------+----------------------+
| A | 2020-4-10 13:30:00 |
+-----------------+----------------------+
| A | 2020-4-10 13:35:00 |
+-----------------+----------------------+
Я хочу сгруппировать клиентов в данных и затем рассчитать разницу во времени между каждой строкой и предыдущей строкой в группе; Дополнительный столбец A используется для хранения разницы во времени между текущей строкой данных в группе клиентов и предыдущей строкой данных
+-----------------+----------------------+-----------------+
| customer | date_time | duration |
+-----------------+----------------------+-----------------+
| A | 2020-4-10 12:10:00 | NaN |
+-----------------+----------------------+-----------------+
| A | 2020-4-10 12:20:00 | 10 |
+-----------------+----------------------+-----------------+
| A | 2020-4-10 12:50:00 | 30 |
+-----------------+----------------------+-----------------+
| A | 2020-4-10 13:30:00 | 40 |
+-----------------+----------------------+-----------------+
| A | 2020-4-10 13:35:00 | 5 |
+-----------------+----------------------+-----------------+
| B | 2020-4-10 12:20:00 | NaN |
+-----------------+----------------------+-----------------+
| B | 2020-4-10 12:25:00 | 5 |
+-----------------+----------------------+-----------------+
| B | 2020-4-10 12:40:00 | 15 |
+-----------------+----------------------+-----------------+
Как мне реализовать это требование в spark? Я использую python для разработки и потоковых данных
Я читаю и записываю данные следующим образом:
import json
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *
import threading
spark_session = SparkSession\
.builder\
.appName("test")\
.getOrCreate()
lines = spark_session\
.readStream\
.format("socket")\
.option("host", "127.0.0.1")\
.option("port", 9998)\
.load()
query_task = df\
.writeStream\
.outputMode("append")\
.format("console")\
.start()
query_task.awaitTermination()
Можете ли вы мне помочь? Я только начал изучать много вещей, которые я не понимаю. Если у вас есть какие-либо python книги по теме «Stream Stream Streaming», пожалуйста, порекомендуйте их мне, большое спасибо