У меня есть данные во входном текстовом файле.Он содержит входные данные в формате: «PriceId, DateTime, PriceForUSA, PriceForUK, PriceForAUS».
Выглядит это так:
0000002,11-05-08-2016,0.92,1.68,0.81
0000003,07-05-08-2016,0.80,1.05,1.49
0000008,07-05-08-2016,1.43,1.29,1.22
Список стран фиксирован (США, Великобритания, AUS), и порядок цен в строках тоже фиксирован (PriceForUSA, PriceForUK, PriceForAUS).
Я читаю эти данные из файла, используя Spark Context, и преобразую их в RDD [List [String [].Каждый список в моей RDD представляет одну строку из входного текстового файла.
Например,
первый список содержит строки
"0000002", "11-05-08-2016", "0.92", "1.68", "0.81"
второй список содержит строки
"0000003", "07-05-08-2016" , "0.80", "1.05" , "1.49"
и т. Д.
У меня также есть собственный класс PriceInfo
case class PriceInfo(priceId: String, priceDate: String, country: String, price: Double) {
override def toString: String = s"$priceId,$priceDate,$country,$price"
}
Нетрудно преобразовать каждый объект List [String] в объект этого класса (я могу сделатьуже), но в этом случае моя задача состоит в том, чтобы получить несколько пользовательских объектов из каждого списка List [String] .
Например, список, содержащий
"0000002", "11-05-08-2016", "0.92", "1.68", "0.81"
следует преобразовать в:
- PriceInfo ("0000002", "11-05-08-2016", "USA", "0,92")
- PriceInfo ("0000002", "11-05-08-2016", "Великобритания", "1.68")
- PriceInfo ("0000002", "11-05-08-2016", "AUS", "0.81").
И каждый List [String] в моей RDD [List [String]] должен быть «разделен» на несколько объектов PriceInfo таким же образом.
Результатом должен быть RDD[PriceInfo].
Единственное решение, которое пришло ко мнеind должен выполнить итерацию RDD [List [String]] с помощью функции foreach () , создать 3 объекта PriceInfo на каждой итерации, затем добавить все созданные объекты в List [PriceObjects] и использовать этоСписок результатов в SparkContext.parallelize (List ...) .
Примерно так:
rawPricesList.foreach(list => {
//...create PriceInfo1 from list
//...create PriceInfo2 from list
//...create PriceInfo3 from list
//...add them all to result List<PriceInfo>
})
//...sc.parallelize(List<PriceInfo>...)
Но такое решение имеет много недостатков.
Главное, что он не будет работать, если у нас нет ссылки на SparkContext.Например, если у нас будет метод getPrices (), который будет иметь только 1 параметр - RDD [List [String]].
def getPrices(rawPricesList: RDD[List[String]]): RDD[PriceInfo] = {
rawPricesList.foreach(list => {
//...create PriceInfo1 from list
//...create PriceInfo2 from list
//...create PriceInfo3 from list
//...add them all to result List<PriceInfo>
})
//...but we can't sc.parallelize(List...) here, because there is no SparkContext sc in method parameters
}
Кроме того, мне кажется, что Scala содержит более элегантное решение.
Я пытался найти похожие образцы в книгах "Scala для нетерпеливых" и "Изучение искры: молниеносный анализ больших данных", но, к сожалению, не нашел ничего подобного этому случаю.Буду очень признателен за помощь и советы.