У меня есть текстовый файл, и у меня есть данные, подобные приведенным ниже:
productId|price|saleEvent|rivalName|fetchTS
123|78.73|Special|VistaCart.com|2017-05-11 15:39:30
123|45.52|Regular|ShopYourWay.com|2017-05-11 16:09:43
123|89.52|Sale|MarketPlace.com|2017-05-11 16:07:29
678|1348.73|Regular|VistaCart.com|2017-05-11 15:58:06
678|1348.73|Special|ShopYourWay.com|2017-05-11 15:44:22
678|1232.29|Daily|MarketPlace.com|2017-05-11 15:53:03
777|908.57|Daily|VistaCart.com|2017-05-11 15:39:01
Мне нужно найти минимальную цену продукта на разных веб-сайтах, например, мой результат должен быть таким:
productId|price|saleEvent|rivalName|fetchTS
123|45.52|Regular|ShopYourWay.com|2017-05-11 16:09:43
678|1232.29|Daily|MarketPlace.com|2017-05-11 15:53:03
777|908.57|Daily|VistaCart.com|2017-05-11 15:39:01
Я пытаюсь так:
case class Product(productId:String, price:Double, saleEvent:String, rivalName:String, fetchTS:String)
val cDF = spark.read.text("/home/prabhat/Documents/Spark/sampledata/competitor_data.txt")
val (header,values) = cDF.collect.splitAt(1)
values.foreach(x => Product(x(0).toString, x(1).toString.toDouble,
x(2).toString, x(3).toString, x(4).toString))
Получение исключения при выполнении последней строки:
java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.spark.sql.catalyst.expressions.GenericRow
.get(rows.scala:174)
at org.apache.spark.sql.Row$class.apply(Row.scala:163)
at
org.apache.spark.sql.catalyst.expressions.GenericRow
.apply(rows.scala:166
)
at $anonfun$1.apply(<console>:28)
at $anonfun$1.apply(<console>:28)
at scala.collection.IndexedSeqOptimized$class.foreach
(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
... 49 elided
Вывод значения в значениях :
scala> values
res2: **Array[org.apache.spark.sql.Row]** = `
Array([123|78.73|Special|VistaCart.com|2017-05-11 15:39:30 ],
[123|45.52|Regular|ShopYourWay.com|2017-05-11 16:09:43 ],
[123|89.52|Sale|MarketPlace.com|2017-05-11 16:07:29 ],
[678|1348.73|Regular|VistaCart.com|2017-05-11 15:58:06 ],
[678|1348.73|Special|ShopYourWay.com|2017-05-11 15:44:22 ],
[678|1232.29|Daily|MarketPlace.com|2017-05-11 15:53:03 ],
[777|908.57|Daily|VistaCart.com|2017-05-11 15:39:01 ]`
scala>
Я могу понять, что мне нужно split("|")
.
scala> val xy = values.foreach(x => x.toString.split("|").toSeq)
xy: Unit = ()
Так что после разбиения он дает мне Unit
класс, то есть void, поэтому не может загрузить значения в класс Product
.Как я могу загрузить этот Dataframe в Product
case case?Я пока не хочу использовать Dataset, хотя Dataset безопасен для типов.
Я использую Spark 2.3 и Scala 2.11.