Мой код должен извлечь Map
из dataframe
. Карта будет использоваться позже для некоторых расчетов (сопоставление кредита с наиболее подходящим исходным биллингом). Однако первый шаг уже терпит неудачу - TransactionId
всегда возвращается как 0.
Упрощенная версия кода:
case class SalesTransaction(
CustomerId : Int,
Score : Int,
Revenue : Double,
Type : String,
Credited : Double = 0.0,
LinkedTransactionId : Int = 0,
IsProcessed : Boolean = false
)
val df = Seq(
(1, 1, 123, "Sales", 100),
(1, 2, 122, "Credit", 100),
(1, 3, 99, "Sales", 70),
(1, 4, 101, "Sales", 77),
(1, 5, 102, "Credit", 75),
(1, 6, 98, "Sales", 71),
(2, 7, 200, "Sales", 55),
(2, 8, 220, "Sales", 55),
(2, 9, 200, "Credit", 50),
(2, 10, 205, "Sales", 50)
).toDF("CustomerId", "TransactionId", "TransactionAttributesScore", "TransactionType", "Revenue")
.withColumn("Revenue", $"Revenue".cast(DoubleType))
.repartition($"CustomerId")
//map generation:
val m2 : Map[Int, SalesTransaction] =
df.map(row => (
row.getAs("TransactionId")
, new SalesTransaction(row.getAs("CustomerId")
, row.getAs("TransactionAttributesScore")
, row.getAs("Revenue")
, row.getAs("TransactionType")
)
)
).collect.toMap
m2.foreach(m => println("key: " + m._1 +" Value: "+ m._2))
Вывод имеет только самую последнюю запись, поскольку все значения, захваченные row.getAs("TransactionId")
, равны нулю (т. Е. Переводится как 0 в карте m2), таким образом, кортеж, созданный в каждой итерации, равен (null, [current row SalesTransaction])
.
Не могли бы вы дать мне совет, что может быть не так с моим кодом? Я совершенно новичок в Scala и, должно быть, здесь не хватает синтаксического нюанса.