Вот одно из возможных решений - отзывы и предложения по другому подходу или примеры итеративных решений Spark / Scala приветствуются:
- Пример будет читать транзакции «Продажи и кредит» для каждого клиента (
CustomerId
) и обрабатывать каждого клиента как отдельный раздел (внешний цикл mapPartition
)
- Кредит будет привязан к продажам с наименьшей оценкой (то есть наименьшей разницей в оценке - с использованием внутреннего цикла
foreach
внутри каждого раздела)
- Изменяемая карта
trnMap
предотвращает двойное назначение каждой транзакции и захватывает обновления из процесса
- Результаты выводятся через
iterator
как в окончательный набор данных dfOut2
Примечание: в данном конкретном случае тот же результат мог бы быть достигнут при использовании оконных функций без использования итеративного решения, но цель состоит в том, чтобы протестировать саму итерационную логику)
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.api.java.JavaRDD
case class Person(name: String, var age: Int)
case class SalesTransaction(
CustomerId : Int,
TransactionId : Int,
Score : Int,
Revenue : Double,
Type : String,
Credited : Double = 0.0,
LinkedTransactionId : Int = 0,
IsProcessed : Boolean = false
)
case class TransactionScore(
TransactionId : Int,
Score : Int
)
case class TransactionPair(
SalesId : Int,
CreditId : Int,
ScoreDiff : Int
)
object ExampleDataFramePartition{
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Example Combiner")
.config("spark.some.config.option", "some-value")
.getOrCreate()
import spark.implicits._
val df = Seq(
(1, 1, 123, "Sales", 100),
(1, 2, 122, "Credit", 100),
(1, 3, 99, "Sales", 70),
(1, 4, 101, "Sales", 77),
(1, 5, 102, "Credit", 75),
(1, 6, 98, "Sales", 71),
(2, 7, 200, "Sales", 55),
(2, 8, 220, "Sales", 55),
(2, 9, 200, "Credit", 50),
(2, 10, 205, "Sales", 50)
).toDF("CustomerId", "TransactionId", "TransactionAttributesScore", "TransactionType", "Revenue")
.withColumn("Revenue", $"Revenue".cast(DoubleType))
.repartition(2,$"CustomerId")
df.show()
val dfOut2 = df.mapPartitions(p => {
println(p)
val trnMap = scala.collection.mutable.Map[Int, SalesTransaction]()
val trnSales = scala.collection.mutable.ArrayBuffer.empty[TransactionScore]
val trnCredits = scala.collection.mutable.ArrayBuffer.empty[TransactionScore]
val trnPairs = scala.collection.mutable.ArrayBuffer.empty[TransactionPair]
p.foreach(row => {
val trnKey: Int = row.getAs[Int]("TransactionId")
val trnValue: SalesTransaction = new SalesTransaction(row.getAs("CustomerId")
, trnKey
, row.getAs("TransactionAttributesScore")
, row.getAs("Revenue")
, row.getAs("TransactionType")
)
trnMap += (trnKey -> trnValue)
if(trnValue.Type == "Sales") {
trnSales += new TransactionScore(trnKey, trnValue.Score)}
else {
trnCredits += new TransactionScore(trnKey, trnValue.Score)}
})
if(trnCredits.size > 0 && trnSales.size > 0) {
//define transaction pairs
trnCredits.foreach(cr => {
trnSales.foreach(sl => {
trnPairs += new TransactionPair(cr.TransactionId, sl.TransactionId, math.abs(cr.Score - sl.Score))
})
})
}
trnPairs.sortBy(t => t.ScoreDiff)
.foreach(t => {
if(!trnMap(t.CreditId).IsProcessed && !trnMap(t.SalesId).IsProcessed){
trnMap(t.SalesId) = new SalesTransaction(trnMap(t.SalesId).CustomerId
, trnMap(t.SalesId).TransactionId
, trnMap(t.SalesId).Score
, trnMap(t.SalesId).Revenue
, trnMap(t.SalesId).Type
, math.min(trnMap(t.CreditId).Revenue, trnMap(t.SalesId).Revenue)
, t.CreditId
, true
)
trnMap(t.CreditId) = new SalesTransaction(trnMap(t.CreditId).CustomerId
, trnMap(t.CreditId).TransactionId
, trnMap(t.CreditId).Score
, trnMap(t.CreditId).Revenue
, trnMap(t.CreditId).Type
, math.min(trnMap(t.CreditId).Revenue, trnMap(t.SalesId).Revenue)
, t.SalesId
, true
)
}
})
trnMap.map(m => m._2).toIterator
})
dfOut2.show()
spark.stop()
}
}