агрегация данных в кадре столбца на основе условия в scala - PullRequest
0 голосов
/ 03 февраля 2020

У меня есть данные csv в следующем формате.

Мне нужно найти 2 лучших поставщиков, чей оборот превышает 100 в 2017 году.

Оборот = Сумма (счета-фактуры) чей статус «Оплачено полностью» - сумма (счета-фактуры, чей статус «Исключение» или «Отклонено»)

Я загрузил данные из csv в блокнот datebricks scala следующим образом:

val invoices_data = spark.read.format(file_type)
                  .option("header", "true")
                  .option("dateFormat", "M/d/yy")
                  .option("inferSchema", "true")
                 .load("invoice.csv")

Затем я попытался создать группу по имени поставщика

val avg_invoice_by_vendor = invoices_data.groupBy("VendorName")

Но сейчас я не знаю, как действовать дальше.

Вот пример данных CSV.

Id     InvoiceDate      Status         Invoice   VendorName
    2   2/23/17         Exception       23        V1
    3   11/23/17        Paid-in-Full    56        V1
    1   12/20/17        Paid-in-Full    12        V1
    5   8/4/19          Paid-in-Full    123       V2
    6   2/6/17          Paid-in-Full    237       V2
    9   3/9/17          Rejected        234       V2
    7   4/23/17         Paid-in-Full    78        V3
    8   5/23/17         Exception       345       V4

Ответы [ 2 ]

1 голос
/ 04 февраля 2020

Я использовал метод разворота для решения вышеуказанной проблемы.

invoices_data
              .filter(invoices_data("InvoiceStatusDesc") === "Paid-in-Full" || 
                invoices_data("InvoiceStatusDesc") === "Exception" ||
                invoices_data("InvoiceStatusDesc") === "Rejected")
              .filter(year(to_date(invoices_data("InvoiceDate"), "M/d/yy")) === 2017)
              .groupBy("InvoiceVendorName").pivot("InvoiceStatusDesc").sum("InvoiceTotal")
1 голос
/ 03 февраля 2020

Вы можете использовать udf для подписания счета в зависимости от статуса и после группировки агрегирования df с помощью функции суммы:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DateType
def signInvoice: (String, Int) => Int = (status: String, invoice: Int) => {
  status match {
    case "Exception" | "Rejected" => -invoice
    case "Paid-in-Full" => invoice
    case _ => throw new IllegalStateException("wrong status")
  }
}

val signInvoiceUdf = spark.udf.register("signInvoice", signInvoice)
val top2_vendorsDF = invoices_data
  .withColumn("InvoiceDate", col("InvoiceDate").cast(DateType))
  .filter(year(col("InvoiceDate")) === lit(2017))
  .withColumn("Invoice", col("Invoice").as[Int])
  .groupBy("VendorName")
  .agg(sum(signInvoiceUdf('Status, 'Invoice)).as("sum_invoice"))
  .filter(col("sum_invoice") > 100)
  .orderBy(col("sum_invoice").desc)
  .take(2)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...