В PySpark StructStream, Как найти разницу во времени между каждой строкой в ​​группе и предыдущей строкой - PullRequest
1 голос
/ 21 апреля 2020

У меня есть эти данные

+-----------------+----------------------+
|     customer    |       date_time      |
+-----------------+----------------------+
|       A         | 2020-4-10 12:10:00   |
+-----------------+----------------------+
|       B         | 2020-4-10 12:20:00   |
+-----------------+----------------------+
|       A         | 2020-4-10 12:20:00   |
+-----------------+----------------------+
|       B         | 2020-4-10 12:25:00   |
+-----------------+----------------------+
|       B         | 2020-4-10 12:40:00   |
+-----------------+----------------------+
|       A         | 2020-4-10 12:50:00   |
+-----------------+----------------------+
|       A         | 2020-4-10 13:30:00   |
+-----------------+----------------------+
|       A         | 2020-4-10 13:35:00   |
+-----------------+----------------------+

Я хочу сгруппировать клиентов в данных и затем рассчитать разницу во времени между каждой строкой и предыдущей строкой в ​​группе; Дополнительный столбец A используется для хранения разницы во времени между текущей строкой данных в группе клиентов и предыдущей строкой данных

+-----------------+----------------------+-----------------+
|     customer    |       date_time      |     duration    |
+-----------------+----------------------+-----------------+
|       A         | 2020-4-10 12:10:00   |       NaN       |
+-----------------+----------------------+-----------------+
|       A         | 2020-4-10 12:20:00   |       10        |
+-----------------+----------------------+-----------------+
|       A         | 2020-4-10 12:50:00   |       30        |
+-----------------+----------------------+-----------------+
|       A         | 2020-4-10 13:30:00   |       40        |
+-----------------+----------------------+-----------------+
|       A         | 2020-4-10 13:35:00   |       5         |
+-----------------+----------------------+-----------------+
|       B         | 2020-4-10 12:20:00   |      NaN        |
+-----------------+----------------------+-----------------+
|       B         | 2020-4-10 12:25:00   |       5         |
+-----------------+----------------------+-----------------+
|       B         | 2020-4-10 12:40:00   |       15        |
+-----------------+----------------------+-----------------+

Как мне реализовать это требование в spark? Я использую python для разработки и потоковых данных

Я читаю и записываю данные следующим образом:

import json
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *

import threading

spark_session = SparkSession\
    .builder\
    .appName("test")\
    .getOrCreate()

lines = spark_session\
    .readStream\
    .format("socket")\
    .option("host", "127.0.0.1")\
    .option("port", 9998)\
    .load()

query_task = df\
    .writeStream\
    .outputMode("append")\
    .format("console")\
    .start()

query_task.awaitTermination()

Можете ли вы мне помочь? Я только начал изучать много вещей, которые я не понимаю. Если у вас есть какие-либо python книги по теме «Stream Stream Streaming», пожалуйста, порекомендуйте их мне, большое спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...