Я хочу иметь возможность создать значение запаздывания на основе значения в одном из столбцов.
в данных, предоставленных Qdf, - это фрейм данных Вопроса и АДФ данных Ответа.Я дал дополнительный столбец объяснения (который на самом деле мне не нужен в моих окончательных данных)
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql import SQLContext
ID = ['A' for i in range(0,10)]+ ['B' for i in range(0,10)]
Day = range(1,11)+range(1,11)
Delay = [2, 2, 2, 3, 2, 4, 3, 2, 2, 2, 2, 2, 3, 2, 4, 3, 2, 2, 2, 3]
Despatched = [2, 3, 1, 4, 6, 2, 6, 5, 3, 6, 3, 1, 2, 4, 1, 2, 3, 3, 6, 1]
Delivered = [0, 0, 2, 3, 1, 0, 10, 0, 0, 13, 0, 0, 3, 1, 0, 6, 0, 0, 6, 3]
Explanation = ["-", "-", "-", "-", "-", "-", "10 (4+6)", "-", "-", "13 (2+6+5)", "-", "-", "-", "-", "-", "6 (2+4)", "-", "-", "6 (1+2+3)", "-"]
QSchema = StructType([StructField("ID", StringType()),StructField("Day", IntegerType()),StructField("Delay", IntegerType()),StructField("Despatched", IntegerType())])
Qdata = map(list, zip(*[ID,Day,Delay,Despatched]))
Qdf = spark.createDataFrame(Qdata,schema=QSchema)
Qdf.show()
+---+---+-----+----------+
| ID|Day|Delay|Despatched|
+---+---+-----+----------+
| A| 1| 2| 2|
| A| 2| 2| 3|
| A| 3| 2| 1|
| A| 4| 3| 4|
| A| 5| 2| 6|
| A| 6| 4| 2|
| A| 7| 3| 6|
| A| 8| 2| 5|
| A| 9| 2| 3|
| A| 10| 2| 6|
| B| 1| 2| 3|
| B| 2| 2| 1|
| B| 3| 3| 2|
| B| 4| 2| 4|
| B| 5| 4| 1|
| B| 6| 3| 2|
| B| 7| 2| 3|
| B| 8| 2| 3|
| B| 9| 2| 6|
| B| 10| 3| 1|
+---+---+-----+----------+
Отправленное количество должно быть записано как доставленное после времени задержки.В идеале было бы здорово, если бы я мог применить lag function
к отправляемому столбцу на основе задержки.Набор данных Ответ будет выглядеть следующим образом:
Adata = map(list, zip(*[ID,Day,Delay,Despatched,Delivered,Explanation]))
ASchema = StructType([StructField("ID", StringType()),StructField("Day", IntegerType()),StructField("Delay", IntegerType()),StructField("Despatched", IntegerType()),StructField("Delivered", IntegerType()),StructField("Explanation", StringType())])
Adf = spark.createDataFrame(Adata,schema=ASchema)
Adf.show()
+---+---+-----+----------+---------+-----------+
| ID|Day|Delay|Despatched|Delivered|Explanation|
+---+---+-----+----------+---------+-----------+
| A| 1| 2| 2| 0| -|
| A| 2| 2| 3| 0| -|
| A| 3| 2| 1| 2| -|
| A| 4| 3| 4| 3| -|
| A| 5| 2| 6| 1| -|
| A| 6| 4| 2| 0| -|
| A| 7| 3| 6| 10| 10 (4+6)|
| A| 8| 2| 5| 0| -|
| A| 9| 2| 3| 0| -|
| A| 10| 2| 6| 13| 13 (2+6+5)|
| B| 1| 2| 3| 0| -|
| B| 2| 2| 1| 0| -|
| B| 3| 3| 2| 3| -|
| B| 4| 2| 4| 1| -|
| B| 5| 4| 1| 0| -|
| B| 6| 3| 2| 6| 6 (2+4)|
| B| 7| 2| 3| 0| -|
| B| 8| 2| 3| 0| -|
| B| 9| 2| 6| 6| 6 (1+2+3)|
| B| 10| 3| 1| 3| -|
+---+---+-----+----------+---------+-----------+
Я попробовал приведенный ниже код, чтобы получить постоянное отставание в 2:
Qdf1=Qdf.withColumn('Delivered_lag',func.lag(Qdf['Despatched'],2).over(Window.partitionBy("ID").orderBy("Day")))
Но, когда я пытаюсь использовать лаг на одномСтолбец и отставание на другой столбец Я получаю сообщение об ошибке:
Qdf1=Qdf.withColumn('Delivered_lag',func.lag(Qdf['Despatched'],Qdf['Delay']).over(Window.partitionBy("ID").orderBy("Day")))
TypeError: объект 'Column' не вызывается
Как я могу обойти это?Я использую PySpark версии 2.3.1 и python версии 2.7.13.