сумма предыдущей строки в текущей строке в искровой скале - PullRequest
0 голосов
/ 11 февраля 2019

Я пытаюсь настроить одно из значений столбца на основе значения в каком-либо другом фрейме данных.При этом, если оставшаяся сумма больше, мне нужно перенести следующую строку и рассчитать окончательную сумму.

Во время этой операции я не могу удержать предыдущую строку, оставшуюся от суммы до следующей операции строки.Я попытался использовать функцию окна запаздывания и использовать параметры итогов, но они не работают должным образом.

Я работаю со Scala.Вот входные данные

val consumption = sc.parallelize(Seq((20180101, 600), (20180201, 900),(20180301, 400),(20180401, 600),(20180501, 1000),(20180601, 1900),(20180701, 500),(20180801, 100),(20180901, 500))).toDF("Month","Usage")
consumption.show()
+--------+-----+
|   Month|Usage|
+--------+-----+
|20180101|  600|
|20180201|  900|
|20180301|  400|
|20180401|  600|
|20180501| 1000|
|20180601| 1900|
|20180701|  500|
|20180801|  100|
|20180901|  500|
+--------+-----+
val promo = sc.parallelize(Seq((20180101, 1000),(20180201, 100),(20180401, 3000))).toDF("PromoEffectiveMonth","promoAmount")
promo.show()
+-------------------+-----------+
|PromoEffectiveMonth|promoAmount|
+-------------------+-----------+
|           20180101|       1000|
|           20180201|        100|
|           20180401|       3000|
+-------------------+-----------+

ожидаемый результат:

val finaldf = sc.parallelize(Seq((20180101,600,400,600),(20180201,900,0,400),(20180301,400,0,0),(20180401,600,2400,600),(20180501,1000,1400,1000),(20180601,1900,0,500),(20180701,500,0,0),(20180801,100,0,0),(20180901,500,0,0))).toDF("Month","Usage","LeftOverPromoAmt","AdjustedUsage")
finaldf.show()
+--------+-----+----------------+-------------+
|   Month|Usage|LeftOverPromoAmt|AdjustedUsage|
+--------+-----+----------------+-------------+
|20180101|  600|             400|          600|
|20180201|  900|               0|          400|
|20180301|  400|               0|            0|
|20180401|  600|            2400|          600|
|20180501| 1000|            1400|         1000|
|20180601| 1900|               0|          500|
|20180701|  500|               0|            0|
|20180801|  100|               0|            0|
|20180901|  500|               0|            0|
+--------+-----+----------------+-------------+

Логика, которую я применяю, основана на соединении месяца и PromoEffective, необходимо применить сумму промо в столбце использования потребления до тех пор, пока сумма промо не станет равной нулю.

Например: в январе 18 месяца сумма промоакции равна 1000, после вычета из использования (600) сумма остатка промо-акции равна 400 иДопустимое использование - 600. Остаток свыше 400 будет рассмотрен на следующий месяц, и в феврале будет промо-акция, а окончательная сумма промо-акции составит 500. Здесь использование больше, чем использование.

Таким образом, оставшаяся сумма промо-акции равна нулю, а корректировка использования составляет 400 (900 - 500).

1 Ответ

0 голосов
/ 11 февраля 2019

Прежде всего, вам необходимо выполнить объединение left_outer, чтобы для каждой строки было соответствующее продвижение.Операция соединения выполняется с помощью полей Month и PromoEffectiveMonth из наборов данных Consumption и promo соответственно.Обратите внимание, что я создал новый столбец Timestamp.Он был создан с помощью функции Spark SQL unix_timestamp.Он будет использоваться для сортировки набора данных по дате.

val ds = consumption
    .join(promo, consumption.col("Month") === promo.col("PromoEffectiveMonth"), "left_outer")
    .select("UserID", "Month", "Usage", "promoAmount")
    .withColumn("Timestamp", unix_timestamp($"Month".cast("string"), "yyyyMMdd").cast(TimestampType))

Это результат этих операций.

+--------+-----+-----------+-------------------+
|   Month|Usage|promoAmount|          Timestamp|
+--------+-----+-----------+-------------------+
|20180301|  400|       null|2018-03-01 00:00:00|
|20180701|  500|       null|2018-07-01 00:00:00|
|20180901|  500|       null|2018-09-01 00:00:00|
|20180101|  600|       1000|2018-01-01 00:00:00|
|20180801|  100|       null|2018-08-01 00:00:00|
|20180501| 1000|       null|2018-05-01 00:00:00|
|20180201|  900|        100|2018-02-01 00:00:00|
|20180601| 1900|       null|2018-06-01 00:00:00|
|20180401|  600|       3000|2018-04-01 00:00:00|
+--------+-----+-----------+-------------------+

Далее необходимо создать Окно .Оконные функции используются для выполнения расчетов по группе записей с использованием некоторых критериев (подробнее об этом здесь ).В нашем случае критерием является сортировка каждой группы по Timestamp.

 val window = Window.orderBy("Timestamp")

Хорошо, теперь самое сложное.Вам необходимо создать определенную пользователем статистическую функцию .В этой функции каждая группа будет обрабатываться в соответствии с пользовательской операцией, и это позволит вам обрабатывать каждую строку с учетом значения предыдущей.

  class CalculatePromos extends UserDefinedAggregateFunction {
    // Input schema for this UserDefinedAggregateFunction
    override def inputSchema: StructType =
      StructType(
        StructField("Usage", LongType) ::
        StructField("promoAmount", LongType) :: Nil)

    // Schema for the parameters that will be used internally to buffer temporary values
    override def bufferSchema: StructType = StructType(
        StructField("AdjustedUsage", LongType) ::
        StructField("LeftOverPromoAmt", LongType) :: Nil
    )

    // The data type returned by this UserDefinedAggregateFunction.
    // In this case, it will return an StructType with two fields: AdjustedUsage and LeftOverPromoAmt
    override def dataType: DataType = StructType(Seq(StructField("AdjustedUsage", LongType), StructField("LeftOverPromoAmt", LongType)))

    // Whether this UDAF is deterministic or not. In this case, it is
    override def deterministic: Boolean = true

    // Initial values for the temporary values declared above
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer(0) = 0L
      buffer(1) = 0L
    }

    // In this function, the values associated to the buffer schema are updated
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

      val promoAmount = if(input.isNullAt(1)) 0L else input.getLong(1)
      val leftOverAmount = buffer.getLong(1)
      val usage = input.getLong(0)
      val currentPromo = leftOverAmount + promoAmount

      if(usage < currentPromo) {
        buffer(0) = usage
        buffer(1) = currentPromo - usage
      } else {
        if(currentPromo == 0)
          buffer(0) = 0L
        else
          buffer(0) = usage - currentPromo
        buffer(1) = 0L
      }
    }

    // Function used to merge two objects. In this case, it is not necessary to define this method since
    // the whole logic has been implemented in update
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {}

    // It is what you will return. In this case, a tuple of the buffered values which rerpesent AdjustedUsage and LeftOverPromoAmt
    override def evaluate(buffer: Row): Any = {
      (buffer.getLong(0), buffer.getLong(1))
    }

  }

По сути, она создает функциюкоторый может использоваться в Spark SQL, который получает два столбца (Usage и promoAmount, как указано в методе inputSchema) и возвращает новый столбец с двумя вложенными столбцами (AdjustedUsage и LeftOverPromAmt, как определено вметод dataType).Метод bufferSchema позволяет создать временное значение, которое будет использоваться для поддержки операций.В этом случае я определил AdjustedUsage и LeftOverPromoAmt.

Логика , которую вы применяете , реализована в методе update.В основном, это берет ранее вычисленные значения и обновляет их.Аргумент buffer содержит временные значения, определенные в bufferSchema, а input сохраняет значение строки, которая обрабатывается в этот момент.Наконец, evaluate возвращает объект кортежа, содержащий результат операций для каждой строки, в этом случае временные значения, определенные в bufferSchema и обновленные в методе update.

Следующим шагом являетсячтобы создать переменную, создав экземпляр класса CalculatePromos.

val calculatePromos = new CalculatePromos

Наконец, вы должны применить пользовательскую агрегатную функцию calculatePromos, используя метод withColumn набора данных.Обратите внимание, что вы должны передать ему входные столбцы (Usage и promoAmount) и применить окно, используя метод over.

ds
  .withColumn("output", calculatePromos($"Usage", $"promoAmount").over(window))
  .select($"Month", $"Usage", $"output.LeftOverPromoAmt".as("LeftOverPromoAmt"), $"output.AdjustedUsage".as("AdjustedUsage"))

Это результат:

+--------+-----+----------------+-------------+
|   Month|Usage|LeftOverPromoAmt|AdjustedUsage|
+--------+-----+----------------+-------------+
|20180101|  600|             400|          600|
|20180201|  900|               0|          400|
|20180301|  400|               0|            0|
|20180401|  600|            2400|          600|
|20180501| 1000|            1400|         1000|
|20180601| 1900|               0|          500|
|20180701|  500|               0|            0|
|20180801|  100|               0|            0|
|20180901|  500|               0|            0|
+--------+-----+----------------+-------------+

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...