Поскольку порядок важен для вычисления 'count_change', я думаю, что мы можем загрузить его в память драйвера, рассчитать и заново создать другой фрейм данных.
Пример кода реализован Java, но я считаю, что есть точно так же и в python.
@Test
public void test() {
StructType schema = createStructType(Arrays.asList(
createStructField("id", IntegerType, true),
createStructField("date", StringType, true),
createStructField("some_count", IntegerType, true)));
// assume source data is already sorted by desc.
Dataset<Row> data = ss.createDataFrame(Arrays.asList(
RowFactory.create(3, "2020-03-31", 5),
RowFactory.create(2, "2020-03-24", 6),
RowFactory.create(1, "2020-03-17", 3)), schema);
// add column and set 0 as default value.
Dataset<Row> dataWithColumn = data.withColumn("count_change", lit(0));
// load driver memory to calculate 'count_change' based on order.
List<Row> dataWithColumnList = dataWithColumn.collectAsList();
List<Row> newList = new ArrayList<>();
// add first row which has count_change 0.
newList.add(dataWithColumnList.get(dataWithColumnList.size() - 1));
for (int i = dataWithColumnList.size() - 2; i >= 0; i--) {
Row currWeek = dataWithColumnList.get(i);
Row prevWeek = dataWithColumnList.get(i+1);
int currCount = currWeek.getInt(currWeek.fieldIndex("some_count"));
int prevCount = prevWeek.getInt(prevWeek.fieldIndex("some_count"));
int countChange = currCount - prevCount;
newList.add(RowFactory.create(currWeek.get(0), currWeek.get(1), currWeek.get(2), countChange));
}
Dataset<Row> result = ss.createDataFrame(newList, dataWithColumn.schema()).sort(col("date").desc());
result.show();
}
Это результат show ():
+---+----------+----------+------------+
| id| date|some_count|count_change|
+---+----------+----------+------------+
| 3|2020-03-31| 5| -1|
| 2|2020-03-24| 6| 3|
| 1|2020-03-17| 3| 0|
+---+----------+----------+------------+