Итеративная обработка RDD / Dataframe в Spark - PullRequest
0 голосов
/ 11 сентября 2018

Мое решение ADLA переводится на Spark.Я пытаюсь найти правильную замену для выражения U-SQL REDUCE , чтобы включить:

  1. Чтение логического раздела и сохранение информации в списке / словаре / векторе или другой структуре данныхв памяти
  2. Применение логики, требующей нескольких итераций
  3. Вывод результатов в виде дополнительных столбцов вместе с исходными данными (исходные строки могут быть частично удалены или дублированы)

Пример возможного задания:

  • Во входном наборе данных есть транзакции продажи и возврата с их идентификаторами и атрибутами
  • Предполагается, что решением будет найти наиболее вероятную продажу для каждого возврата
  • Транзакция возврата должна происходить после транзакции продажи и должна быть максимально похожа на транзакции продажи (наилучшее доступно соответствие)
  • Транзакция возврата должна быть связана ровно с одной транзакцией продажи;транзакция продажи может быть связана с одной или без транзакции возврата - ссылка должна быть зафиксирована в новом столбце LinkedTransactionId

Решение, вероятно, может быть достигнуто с помощью команды groupByKey, но яне удается определить, как применить логику в нескольких строках.Все примеры, которые мне удалось найти, представляют собой некоторые варианты встроенных функций (обычно агрегатных - например, .map(t => (t._1, t._2.sum))), которым не требуется информация об отдельных записях из одного раздела.

Может кто-нибудь поделиться примероманалогичного решения или указать мне правильное направление?

1 Ответ

0 голосов
/ 13 ноября 2018

Вот одно из возможных решений - отзывы и предложения по другому подходу или примеры итеративных решений Spark / Scala приветствуются:

  • Пример будет читать транзакции «Продажи и кредит» для каждого клиента (CustomerId) и обрабатывать каждого клиента как отдельный раздел (внешний цикл mapPartition)
  • Кредит будет привязан к продажам с наименьшей оценкой (то есть наименьшей разницей в оценке - с использованием внутреннего цикла foreach внутри каждого раздела)
  • Изменяемая карта trnMap предотвращает двойное назначение каждой транзакции и захватывает обновления из процесса
  • Результаты выводятся через iterator как в окончательный набор данных dfOut2

Примечание: в данном конкретном случае тот же результат мог бы быть достигнут при использовании оконных функций без использования итеративного решения, но цель состоит в том, чтобы протестировать саму итерационную логику)

import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.api.java.JavaRDD



case class Person(name: String, var age: Int)

case class SalesTransaction(
                      CustomerId : Int,
                      TransactionId : Int,
                      Score : Int,
                      Revenue : Double,
                      Type : String,
                      Credited : Double = 0.0,
                      LinkedTransactionId : Int = 0,
                      IsProcessed : Boolean = false
                      )

case class TransactionScore(
                           TransactionId : Int,
                           Score : Int
                           )

case class TransactionPair(
                          SalesId : Int,
                          CreditId : Int,
                          ScoreDiff : Int
                          )

object ExampleDataFramePartition{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("Example Combiner")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    import spark.implicits._

    val df = Seq(
      (1, 1, 123, "Sales", 100),
      (1, 2, 122, "Credit", 100),
      (1, 3, 99, "Sales", 70),
      (1, 4, 101, "Sales", 77),
      (1, 5, 102, "Credit", 75),
      (1, 6, 98, "Sales", 71),
      (2, 7, 200, "Sales", 55),
      (2, 8, 220, "Sales", 55),
      (2, 9, 200, "Credit", 50),
      (2, 10, 205, "Sales", 50)
    ).toDF("CustomerId", "TransactionId", "TransactionAttributesScore", "TransactionType", "Revenue")
      .withColumn("Revenue", $"Revenue".cast(DoubleType))
      .repartition(2,$"CustomerId")

    df.show()


    val dfOut2 = df.mapPartitions(p => {

      println(p)


      val trnMap = scala.collection.mutable.Map[Int, SalesTransaction]()
      val trnSales = scala.collection.mutable.ArrayBuffer.empty[TransactionScore]
      val trnCredits = scala.collection.mutable.ArrayBuffer.empty[TransactionScore]
      val trnPairs = scala.collection.mutable.ArrayBuffer.empty[TransactionPair]


      p.foreach(row => {
        val trnKey: Int = row.getAs[Int]("TransactionId")
        val trnValue: SalesTransaction = new SalesTransaction(row.getAs("CustomerId")
          , trnKey
          , row.getAs("TransactionAttributesScore")
          , row.getAs("Revenue")
          , row.getAs("TransactionType")
        )

        trnMap += (trnKey -> trnValue)
        if(trnValue.Type == "Sales") {
          trnSales += new TransactionScore(trnKey, trnValue.Score)}
        else {
          trnCredits += new TransactionScore(trnKey, trnValue.Score)}
      })

      if(trnCredits.size > 0 && trnSales.size > 0) {
        //define transaction pairs
        trnCredits.foreach(cr => {
          trnSales.foreach(sl => {
            trnPairs += new TransactionPair(cr.TransactionId, sl.TransactionId, math.abs(cr.Score - sl.Score))
          })
        })
      }

      trnPairs.sortBy(t => t.ScoreDiff)
          .foreach(t => {
            if(!trnMap(t.CreditId).IsProcessed && !trnMap(t.SalesId).IsProcessed){
              trnMap(t.SalesId) = new SalesTransaction(trnMap(t.SalesId).CustomerId
                , trnMap(t.SalesId).TransactionId
                , trnMap(t.SalesId).Score
                , trnMap(t.SalesId).Revenue
                , trnMap(t.SalesId).Type
                , math.min(trnMap(t.CreditId).Revenue, trnMap(t.SalesId).Revenue)
                , t.CreditId
                , true
              )
              trnMap(t.CreditId) = new SalesTransaction(trnMap(t.CreditId).CustomerId
                , trnMap(t.CreditId).TransactionId
                , trnMap(t.CreditId).Score
                , trnMap(t.CreditId).Revenue
                , trnMap(t.CreditId).Type
                , math.min(trnMap(t.CreditId).Revenue, trnMap(t.SalesId).Revenue)
                , t.SalesId
                , true
              )              
            }
          })

      trnMap.map(m => m._2).toIterator

    })

    dfOut2.show()


    spark.stop()
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...