Я действительно не понимаю, что все, что вы хотите сделать ...
Но, так как вы сказали, что учитесь, я постараюсь объяснить все шаг за шагом - надеюсь, это поможет вам.
Во-первых, как совет от коллеги, который перешел с Java на Scala пару лет назад.Избегайте всех мутаций, насколько это возможно, заставьте себя думать и программировать функциональным способом - таким образом, используйте val
вместо var
и неизменных коллекций вместо mutable ones.
Во-вторых, по возможности избегайте использования вещей типа Any
, например, здесь ...
var src_rdd = src_policy_df.rdd.map(_.toSeq.toList)
... вы можете получать значения, которые вы хотите получить от каждого Row
в более типизированном виде, например:
val src_rdd = src_policy_df.rdd.map { row =>
(
row.getAs[Int](fieldName = "policy_id"),
row.getAs[String](fieldName = "otherdetails")
)
}
// src_rdd: RDD[(Int, String)]
Или, что еще лучше, используйте Dataset
( Типизированный DataFrame) .
import spark.implicits._ // spark is an instance of SparkSession
final case class Policy(policy_id: Int, otherdetails: String)
val src_dataset = src_policy_df.as[Policy] // implicit Encoder needed here, provided by the spark implicits.
В Spark вы никогда не должны collect
свои данные - за исключением последнего шага вашего конвейера вычислений (и в большинстве случаев это делается только на этапе отладки,потому что, как правило, вы сохраняете его во внешнем хранилище данных, например HDFS или mongo) , или если вы уверены, что у вас есть маленький RDD
, который вы хотите сделать доступным для других преобразований в виде таблицы поиска или чего-то подобного (например, это очень часто встречается на RDD с сокращенной парой, поэтому существует reduceByKeyLocally
метод, который вернет Map ) .
Почему?- Поскольку collect
переносит все данные, которые были распределены на Executors , в Driver , это означает, что вы больше не используете платформу для распараллеливания ваших вычислений.
Что выследует построить ваши вычисления, используя Преобразования , предоставленные Spark, например map
.
val orig_pol_id = 10110000
val ref_surr_id = 1345678
// Using RDDs.
val src_policy_final_rdd = src_rdd.map {
case (id, otherdetails) if (id == orig_pol_id) => (ref_surr_id, otherdetails)
case policy => policy // default case, nothing change.
}
// Using Datasets.
val src_policy_final_dataset = src_dataset.map {
case policy if (policy.id == orig_pol_id) => policy.copy(id = ref_surr_id) // the copy method returns a new policy with the provided fields changed.
case policy => policy // default case, nothing change.
}
Наконец, при записи RDD
в HDFS ,он использует значение по умолчанию toString
для каждого элемента для печати каждой строки.Поэтому вам может потребоваться отформатировать его перед сохранением.
val write_rdd = src_policy_final_rdd.map {
case (id, otherdetails) => s"$id,$otherdetails"
}
// wirte_rdd: RDD[String]
src_write.saveAsTextFile("sparktest/pol_det")
Или, если вы используете Dataset
, вы можете использовать DataframeWriter api, чтобы обработать все это для вас, (Рекомендуется)
src_policy_final_dataset
.write
.option("header", "true")
.option("sep", ",") // ',' is the default separator, but I prefer to be specific.
.csv("sparktest/pol_det")
Это должно ответить на все ваши вопросы.
PS: Два заключительных замечания.
Фрист, в общем, этот вопрос "too board" за то, что вас спросили / ответили в SO - таким образом, попытайтесь ограничить свою сферу и быть более ясным в следующий раз;).
И, вы можете сначала прочитать о Spark иделать быстрые уроки, чтобы почувствовать себя более комфортно с фреймворком - кстати, это - короткая искра мастерская я сделал для офиса несколько дней назад, он был предназначен для разработчиков не-Scala,надеюсь, это вам тоже поможет :) 1081 *