Spark / Scala: от RDD [список <String>] до RDD [пользовательский объект] - PullRequest
0 голосов
/ 27 августа 2018

У меня есть данные во входном текстовом файле.Он содержит входные данные в формате: «PriceId, DateTime, PriceForUSA, PriceForUK, PriceForAUS».

Выглядит это так:

0000002,11-05-08-2016,0.92,1.68,0.81

0000003,07-05-08-2016,0.80,1.05,1.49

0000008,07-05-08-2016,1.43,1.29,1.22

Список стран фиксирован (США, Великобритания, AUS), и порядок цен в строках тоже фиксирован (PriceForUSA, PriceForUK, PriceForAUS).

Я читаю эти данные из файла, используя Spark Context, и преобразую их в RDD [List [String [].Каждый список в моей RDD представляет одну строку из входного текстового файла.

Например,

первый список содержит строки

"0000002", "11-05-08-2016", "0.92", "1.68", "0.81"

второй список содержит строки

"0000003", "07-05-08-2016" , "0.80", "1.05" , "1.49"

и т. Д.

У меня также есть собственный класс PriceInfo

case class PriceInfo(priceId: String, priceDate: String, country: String, price: Double) {

  override def toString: String = s"$priceId,$priceDate,$country,$price"
}

Нетрудно преобразовать каждый объект List [String] в объект этого класса (я могу сделатьуже), но в этом случае моя задача состоит в том, чтобы получить несколько пользовательских объектов из каждого списка List [String] .

Например, список, содержащий

"0000002", "11-05-08-2016", "0.92", "1.68", "0.81"

следует преобразовать в:

  • PriceInfo ("0000002", "11-05-08-2016", "USA", "0,92")
  • PriceInfo ("0000002", "11-05-08-2016", "Великобритания", "1.68")
  • PriceInfo ("0000002", "11-05-08-2016", "AUS", "0.81").

И каждый List [String] в моей RDD [List [String]] должен быть «разделен» на несколько объектов PriceInfo таким же образом.

Результатом должен быть RDD[PriceInfo].

Единственное решение, которое пришло ко мнеind должен выполнить итерацию RDD [List [String]] с помощью функции foreach () , создать 3 объекта PriceInfo на каждой итерации, затем добавить все созданные объекты в List [PriceObjects] и использовать этоСписок результатов в SparkContext.parallelize (List ...) .

Примерно так:

rawPricesList.foreach(list => {

      //...create PriceInfo1 from list
      //...create PriceInfo2 from list
      //...create PriceInfo3 from list

      //...add them all to result List<PriceInfo>

    })

    //...sc.parallelize(List<PriceInfo>...)

Но такое решение имеет много недостатков.

Главное, что он не будет работать, если у нас нет ссылки на SparkContext.Например, если у нас будет метод getPrices (), который будет иметь только 1 параметр - RDD [List [String]].

def getPrices(rawPricesList: RDD[List[String]]): RDD[PriceInfo] = {



    rawPricesList.foreach(list => {

      //...create PriceInfo1 from list
      //...create PriceInfo2 from list
      //...create PriceInfo3 from list

      //...add them all to result List<PriceInfo>

    })

    //...but we can't sc.parallelize(List...) here, because there is no SparkContext sc in method parameters
  }

Кроме того, мне кажется, что Scala содержит более элегантное решение.

Я пытался найти похожие образцы в книгах "Scala для нетерпеливых" и "Изучение искры: молниеносный анализ больших данных", но, к сожалению, не нашел ничего подобного этому случаю.Буду очень признателен за помощь и советы.

1 Ответ

0 голосов
/ 27 августа 2018

Вот один из подходов:

  1. Загрузите текстовый файл и разбейте каждую строку на массив [String] из (id, date, price1, price2, price3)
  2. Преобразуйте каждыйстрока в (id, date, Array [(country, numericPrice)]) с использованием zip
  3. Сведение кортежей (country, numericPrice) в каждой строке в ряды PriceInfo объектов с использованием flatMap

Пример кода ниже:

case class PriceInfo(priceId: String, priceDate: String, country: String, price: Double) {
  override def toString: String = s"$priceId,$priceDate,$country,$price"
}

val countryList = List("USA", "UK", "AUS")

val rdd = sc.textFile("/path/to/textfile").
  map( _.split(",") ).
  map{ case Array(id, date, p1, p2, p3) =>
    (id, date, countryList.zip(List(p1.toDouble, p2.toDouble, p3.toDouble)))
  }.
  flatMap{ case (id, date, countryPrices) =>
    countryPrices.map( cp => PriceInfo(id, date, cp._1, cp._2) ) 
  }
// rdd: org.apache.spark.rdd.RDD[PriceInfo] = ...

rdd.collect
// res1: Array[PriceInfo] = Array(
//    0000002,11-05-08-2016,USA,0.92,
//    0000002,11-05-08-2016,UK,1.68,
//    0000002,11-05-08-2016,AUS,0.81,
//    0000003,07-05-08-2016,USA,0.8,
//    0000003,07-05-08-2016,UK,1.05,
//    0000003,07-05-08-2016,AUS,1.49,
//    0000008,07-05-08-2016,USA,1.43,
//    0000008,07-05-08-2016,UK,1.29,
//    0000008,07-05-08-2016,AUS,1.22
// )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...