Apache Spark / PySpark - Как постепенно увеличивать значения столбца? - PullRequest
0 голосов
/ 31 марта 2020

Приведенный ниже фрейм данных уже заполнен некоторыми данными.

(A)
--------------------------------
| id | date       | some_count |
--------------------------------
| 3  | 2020-03-31 |    5       | 
| 2  | 2020-03-24 |    6       |
| 1  | 2020-03-17 |    3       |
--------------------------------

Я хочу создать еще один фрейм данных на основе вышеизложенного, но с дополнительным столбцом, который содержит изменения за предыдущую неделю из some_count поле для каждой недели. (изменение первой записи принимается за 0, потому что у него нет записи на предыдущей неделе для сравнения)

(B)
-----------------------------------------------
| id | date       | some_count | count_change |
-----------------------------------------------
| 3  | 2020-03-31 |    5       |    -1        | 
| 2  | 2020-03-24 |    6       |     3        |
| 1  | 2020-03-17 |    3       |     0        |
-----------------------------------------------

Как выполнить этот расчет с Apache Spark (SQL / PySpark)?

1 Ответ

1 голос
/ 01 апреля 2020

Поскольку порядок важен для вычисления 'count_change', я думаю, что мы можем загрузить его в память драйвера, рассчитать и заново создать другой фрейм данных.
Пример кода реализован Java, но я считаю, что есть точно так же и в python.

@Test
public void test() {
    StructType schema = createStructType(Arrays.asList(
            createStructField("id", IntegerType, true),
            createStructField("date", StringType, true),
            createStructField("some_count", IntegerType, true)));

    // assume source data is already sorted by desc.
    Dataset<Row> data = ss.createDataFrame(Arrays.asList(
            RowFactory.create(3, "2020-03-31", 5),
            RowFactory.create(2, "2020-03-24", 6),
            RowFactory.create(1, "2020-03-17", 3)), schema);

    // add column and set 0 as default value.
    Dataset<Row> dataWithColumn = data.withColumn("count_change", lit(0));

    // load driver memory to calculate 'count_change' based on order.
    List<Row> dataWithColumnList = dataWithColumn.collectAsList();
    List<Row> newList = new ArrayList<>();

    // add first row which has count_change 0.
    newList.add(dataWithColumnList.get(dataWithColumnList.size() - 1));

    for (int i = dataWithColumnList.size() - 2; i >= 0; i--) {
        Row currWeek = dataWithColumnList.get(i);
        Row prevWeek = dataWithColumnList.get(i+1);
        int currCount = currWeek.getInt(currWeek.fieldIndex("some_count"));
        int prevCount = prevWeek.getInt(prevWeek.fieldIndex("some_count"));
        int countChange = currCount - prevCount;
        newList.add(RowFactory.create(currWeek.get(0), currWeek.get(1), currWeek.get(2), countChange));
    }

    Dataset<Row> result = ss.createDataFrame(newList, dataWithColumn.schema()).sort(col("date").desc());
    result.show();
}

Это результат show ():

+---+----------+----------+------------+
| id|      date|some_count|count_change|
+---+----------+----------+------------+
|  3|2020-03-31|         5|          -1|
|  2|2020-03-24|         6|           3|
|  1|2020-03-17|         3|           0|
+---+----------+----------+------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...