Как загрузить данные в класс кейсов Product, используя Dataframe в Spark - PullRequest
0 голосов
/ 30 мая 2018

У меня есть текстовый файл, и у меня есть данные, подобные приведенным ниже:

productId|price|saleEvent|rivalName|fetchTS 
123|78.73|Special|VistaCart.com|2017-05-11 15:39:30 
123|45.52|Regular|ShopYourWay.com|2017-05-11 16:09:43 
123|89.52|Sale|MarketPlace.com|2017-05-11 16:07:29 
678|1348.73|Regular|VistaCart.com|2017-05-11 15:58:06 
678|1348.73|Special|ShopYourWay.com|2017-05-11 15:44:22 
678|1232.29|Daily|MarketPlace.com|2017-05-11 15:53:03 
777|908.57|Daily|VistaCart.com|2017-05-11 15:39:01 

Мне нужно найти минимальную цену продукта на разных веб-сайтах, например, мой результат должен быть таким:

productId|price|saleEvent|rivalName|fetchTS 
123|45.52|Regular|ShopYourWay.com|2017-05-11 16:09:43 
678|1232.29|Daily|MarketPlace.com|2017-05-11 15:53:03 
777|908.57|Daily|VistaCart.com|2017-05-11 15:39:01 

Я пытаюсь так:

case class Product(productId:String, price:Double, saleEvent:String, rivalName:String, fetchTS:String)

val cDF = spark.read.text("/home/prabhat/Documents/Spark/sampledata/competitor_data.txt")
val (header,values) = cDF.collect.splitAt(1)
values.foreach(x => Product(x(0).toString, x(1).toString.toDouble, 
x(2).toString, x(3).toString, x(4).toString))

Получение исключения при выполнении последней строки:

 java.lang.ArrayIndexOutOfBoundsException: 1
 at org.apache.spark.sql.catalyst.expressions.GenericRow
 .get(rows.scala:174)
 at org.apache.spark.sql.Row$class.apply(Row.scala:163)
 at 
 org.apache.spark.sql.catalyst.expressions.GenericRow
 .apply(rows.scala:166
 )
 at $anonfun$1.apply(<console>:28)
 at $anonfun$1.apply(<console>:28)
 at scala.collection.IndexedSeqOptimized$class.foreach
 (IndexedSeqOptimized.scala:33)
 at 
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 ... 49 elided

Вывод значения в значениях :

scala> values
res2: **Array[org.apache.spark.sql.Row]** = ` 
Array([123|78.73|Special|VistaCart.com|2017-05-11 15:39:30 ], 
[123|45.52|Regular|ShopYourWay.com|2017-05-11 16:09:43 ], 
[123|89.52|Sale|MarketPlace.com|2017-05-11 16:07:29 ], 
[678|1348.73|Regular|VistaCart.com|2017-05-11 15:58:06 ], 
[678|1348.73|Special|ShopYourWay.com|2017-05-11 15:44:22 ], 
[678|1232.29|Daily|MarketPlace.com|2017-05-11 15:53:03 ], 
[777|908.57|Daily|VistaCart.com|2017-05-11 15:39:01 ]`
scala> 

Я могу понять, что мне нужно split("|").

scala> val xy = values.foreach(x => x.toString.split("|").toSeq)
xy: Unit = ()

Так что после разбиения он дает мне Unit класс, то есть void, поэтому не может загрузить значения в класс Product.Как я могу загрузить этот Dataframe в Product case case?Я пока не хочу использовать Dataset, хотя Dataset безопасен для типов.

Я использую Spark 2.3 и Scala 2.11.

1 Ответ

0 голосов
/ 31 мая 2018

Проблема в том, что split принимает регулярное выражение, что означает, что вам нужно использовать "\\|" вместо одного "|".Кроме того, foreach необходимо изменить на map, чтобы фактически дать возвращаемое значение, то есть:

val xy = values.map(x => x.toString.split("\\|"))

Однако лучшим подходом будет чтение данных в виде файла CSVс | разделителями.Таким образом, вам не нужно обрабатывать заголовок особым образом, и, выводя типы столбцов, нет необходимости делать какие-либо преобразования (здесь я изменил fetchTS на метку времени):

case class Product(productId: String, price: Double, saleEvent: String, rivalName: String, fetchTS: Timestamp)

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .option("sep", "|")
  .csv("/home/prabhat/Documents/Spark/sampledata/competitor_data.txt")
  .as[Product]

Последняя строка преобразует информационный фрейм в класс Product.Если вы хотите использовать его вместо СДР, просто добавьте .rdd в конце.

После того, как это будет сделано, используйте groupBy и agg для получения окончательных результатов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...