Pyspark - как я могу изменить структуру вывода в следующем формате? - PullRequest
0 голосов
/ 16 июня 2020

У меня есть две таблицы, как показано ниже:

Таблица 1 -

enter image description here

Таблица 2 -

enter image description here

Я хотел бы, чтобы мой результат был похож на следующую таблицу со следующим расчетом для l oop, пока все недели в таблице 2 не пройдут через все дни таблицы 1:

table3 (week1_1) = table2 ( week1 ) * table1 ( day1 _ratio)

table3 (week1_2) = table2 ( week1 ) * table1 ( day2 _ratio)

enter image description here

Как это сделать?

Все помощь приветствуется !!

Спасибо

1 Ответ

1 голос
/ 16 июня 2020

Попробуйте это -

Написано на scala, но может быть перенесено в pyspark с минимальным изменением

Загрузить входные

   val table1 = Seq(
      ("o1", "i1", 1, 0.6),
      ("o1", "i1", 2, 0.4)
    ).toDF("outlet", "item", "day", "ratio")
    table1.show(false)
    /**
      * +------+----+---+-----+
      * |outlet|item|day|ratio|
      * +------+----+---+-----+
      * |o1    |i1  |1  |0.6  |
      * |o1    |i1  |2  |0.4  |
      * +------+----+---+-----+
      */

    val table2 = Seq(
      ("o1", "i1", 4, 5, 6, 8)
    ).toDF("outlet", "item", "week1", "week2", "week3", "week4")
    table2.show(false)
    /**
      * +------+----+-----+-----+-----+-----+
      * |outlet|item|week1|week2|week3|week4|
      * +------+----+-----+-----+-----+-----+
      * |o1    |i1  |4    |5    |6    |8    |
      * +------+----+-----+-----+-----+-----+
      */

Пользовательские функции искры

    table1.join(table2, Seq("outlet", "item"))
      .groupBy("outlet", "item")
      .pivot("day")
      .agg(
        first($"week1" * $"ratio").as("week1"),
        first($"week2" * $"ratio").as("week2"),
        first($"week3" * $"ratio").as("week3"),
        first($"week4" * $"ratio").as("week4")
      ).show(false)

    /**
      * +------+----+-------+-------+------------------+-------+-------+-------+------------------+-------+
      * |outlet|item|1_week1|1_week2|1_week3           |1_week4|2_week1|2_week2|2_week3           |2_week4|
      * +------+----+-------+-------+------------------+-------+-------+-------+------------------+-------+
      * |o1    |i1  |2.4    |3.0    |3.5999999999999996|4.8    |1.6    |2.0    |2.4000000000000004|3.2    |
      * +------+----+-------+-------+------------------+-------+-------+-------+------------------+-------+
      */

In python

from pyspark.sql import functions as F
 table1.join(table2, ['outlet', 'item'])       
.groupBy("outlet", "item")       
.pivot("day")       
.agg(         
F.first(df.week1 * df.ratio).alias("week1"),         
F.first(df.week2 * df.ratio).alias("week2"),         
F.first(df.week3 * df.ratio).alias("week3"),         
F.first(df.week4 * df.ratio).alias("week4")       
).show(truncate=False)
...