Запись значений ListBuffer [List [Any]] в CSV с использованием spark и scala - PullRequest
0 голосов
/ 28 ноября 2018

Я перефразировал свой вопрос.

Я изучаю скалу и искру.Мне известно о создании RDD из файла CSV вместо создания DF и преобразовании его в RDD.Но я пробую приведенную ниже комбинацию.

Создание scala ListBuffer, Spark Dataframe и преобразование его в RDD:

scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer

scala> var src_policy_final = new ListBuffer[List[Any]]
src_policy_final: scala.collection.mutable.ListBuffer[List[Any]] = ListBuffer()

scala> var src_policy_final = new ListBuffer[List[Any]]
src_policy_final: scala.collection.mutable.ListBuffer[List[Any]] = ListBuffer()

scala> var src_policy_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("sparktest/policy_details.csv")
src_policy_df: org.apache.spark.sql.DataFrame = [policy_id: int, otherdetails: string]

scala> var src_rdd = src_policy_df.rdd.map(_.toSeq.toList)
src_rdd: org.apache.spark.rdd.RDD[List[Any]] = MapPartitionsRDD[40] at map at <console>:26

scala> var src_pol_list = src_rdd.collect.toList
src_pol_list: List[List[Any]] = List(List(10110000, This is the first policy), List(10456200, This is the second policy), List(10345300, This is the third policy))

Использование scala для цикла Я повторяю записи Spark RDD для заменызначение столбца (policy_id с surrogateId), как показано ниже -

scala> for(pol_details <- src_pol_list){
     | src_policy_final += pol_details.toList.map(e => if(e==10110000) 1345678 else e)
     | }

Я изменяю значения конкретного столбца записи с помощью .map(e => if(e==orig_pol_id) ref_surr_id else e) и добавляю записи в ListBuffer[List[Any]].После того, как я завершу итерацию и сгенерирую все записи в RDD, я запишу значения ListBuffer[Lis[Any]] в виде файла csv в файловую систему HDFS, используя функцию SaveAsTextFile("/sparktest/policy_details")

Когда я выполняю println из src_policy_final, вывод будет:

    scala> println(src_policy_final)
ListBuffer(List(1345678, This is the first policy), List(10456200, This is the second policy), List(10345300, This is the third policy))

Теперь я записываю измененные данные обратно в файловую систему HDFS, преобразовывая ListBuffer [ListAny]] в RDD:

scala> var src_write = sc.parallelize(src_policy_final.toList)
src_write: org.apache.spark.rdd.RDD[List[Any]] = ParallelCollectionRDD[43] at parallelize at <console>:53

Запись в файловую систему HDFS:

scala> src_write.saveAsTextFile("sparktest/pol_det")

Выходные данные выглядят так:

List(1345678, This is the first policy)
List(10456200, This is the second policy)
List(10345300, This is the third policy)

Вывод, который хотелось бы получить:

1345678, This is the first policy
10456200, This is the second policy
10345300, This is the third policy

Я не уверен, как загрузить выводсогласно моему требованию.

Надеюсь, я дал лучшее объяснение о том, чего я пытаюсь достичь. Не могли бы вы помочь?

1 Ответ

0 голосов
/ 03 декабря 2018

Я действительно не понимаю, что все, что вы хотите сделать ...
Но, так как вы сказали, что учитесь, я постараюсь объяснить все шаг за шагом - надеюсь, это поможет вам.

Во-первых, как совет от коллеги, который перешел с Java на Scala пару лет назад.Избегайте всех мутаций, насколько это возможно, заставьте себя думать и программировать функциональным способом - таким образом, используйте val вместо var и неизменных коллекций вместо mutable ones.

Во-вторых, по возможности избегайте использования вещей типа Any, например, здесь ...

var src_rdd = src_policy_df.rdd.map(_.toSeq.toList)

... вы можете получать значения, которые вы хотите получить от каждого Row в более типизированном виде, например:

val src_rdd = src_policy_df.rdd.map { row =>
   (
      row.getAs[Int](fieldName = "policy_id"),
      row.getAs[String](fieldName = "otherdetails")
   )
}
// src_rdd: RDD[(Int, String)]

Или, что еще лучше, используйте Dataset ( Типизированный DataFrame) .

import spark.implicits._ // spark is an instance of SparkSession
final case class Policy(policy_id: Int, otherdetails: String)
val src_dataset = src_policy_df.as[Policy] // implicit Encoder needed here, provided by the spark implicits.

В Spark вы никогда не должны collect свои данные - за исключением последнего шага вашего конвейера вычислений (и в большинстве случаев это делается только на этапе отладки,потому что, как правило, вы сохраняете его во внешнем хранилище данных, например HDFS или mongo) , или если вы уверены, что у вас есть маленький RDD, который вы хотите сделать доступным для других преобразований в виде таблицы поиска или чего-то подобного (например, это очень часто встречается на RDD с сокращенной парой, поэтому существует reduceByKeyLocally метод, который вернет Map ) .
Почему?- Поскольку collect переносит все данные, которые были распределены на Executors , в Driver , это означает, что вы больше не используете платформу для распараллеливания ваших вычислений.
Что выследует построить ваши вычисления, используя Преобразования , предоставленные Spark, например map.

val orig_pol_id = 10110000
val ref_surr_id = 1345678

// Using RDDs.
val src_policy_final_rdd = src_rdd.map {
  case (id, otherdetails) if (id == orig_pol_id) => (ref_surr_id, otherdetails)
  case policy => policy // default case, nothing change.
}

// Using Datasets.
val src_policy_final_dataset = src_dataset.map {
  case policy if (policy.id == orig_pol_id) => policy.copy(id = ref_surr_id) // the copy method returns a new policy with the provided fields changed.
  case policy => policy // default case, nothing change.
}

Наконец, при записи RDD в HDFS ,он использует значение по умолчанию toString для каждого элемента для печати каждой строки.Поэтому вам может потребоваться отформатировать его перед сохранением.

val write_rdd = src_policy_final_rdd.map {
   case (id, otherdetails) => s"$id,$otherdetails"
}
// wirte_rdd: RDD[String]
src_write.saveAsTextFile("sparktest/pol_det")

Или, если вы используете Dataset, вы можете использовать DataframeWriter api, чтобы обработать все это для вас, (Рекомендуется)

src_policy_final_dataset
  .write
  .option("header", "true")
  .option("sep", ",") // ',' is the default separator, but I prefer to be specific.
  .csv("sparktest/pol_det")

Это должно ответить на все ваши вопросы.

PS: Два заключительных замечания.
Фрист, в общем, этот вопрос "too board" за то, что вас спросили / ответили в SO - таким образом, попытайтесь ограничить свою сферу и быть более ясным в следующий раз;).
И, вы можете сначала прочитать о Spark иделать быстрые уроки, чтобы почувствовать себя более комфортно с фреймворком - кстати, это - короткая искра мастерская я сделал для офиса несколько дней назад, он был предназначен для разработчиков не-Scala,надеюсь, это вам тоже поможет :) 1081 *

...