Обновление СДР Объектов - PullRequest
1 голос
/ 18 июня 2019

Я только начал изучать scala и столкнулся с некоторыми проблемами, связанными с манипулированием RDD объектов.

У меня та же проблема, которая указана в приведенной ниже ссылке

Обновление внутреннего состояния элементов СДР

Есть ли другой способ достиженияРешение проблемы указано в приведенной выше ссылке?Также возможно ли использовать набор данных или фрейм данных для достижения того, что мы пытаемся сделать?

1 Ответ

3 голосов
/ 19 июня 2019

Неизменяемость - одна из ключевых концепций функционального программирования. Вы не можете изменить RDD или данные внутри, но вы можете создать новые RDD на основе данных из старых RDD.

Я изменил пример по ссылке в вашем вопросе, чтобы показать, как обычно выглядит такое преобразование.

//just case class with foo and bar fields that can be empty.
case class Test (foo: Option[Double], bar: Option[Double], someOtherVal: String)

// as you can see this is not actually "update"
// it creates new Test with "updated" foo and bar fields 
// NOTE: this logic usually lives outside data object 
def updateFooBar(t: Test) = Test(Some(Math.random()), Some(Math.random()),t.someOtherVal)


val testList = Array.fill(5)(Test(None,None,"someString"))
val testRDD = sc.parallelize(testList)

//creates new RDD based on old one by applying updateFooBar to each element. 
val newRdd = testRDD.map{ x => updateFooBar(x) }
//or just  val newRdd = testRDD.map(updateFooBar)

newRdd.collect().foreach { x=> println(x.foo+"~"+x.bar+"~"+x.someOtherVal) }

Вы можете преобразовать Dataset точно так же, как RDD:

val newDs = testRDD.toDS().map( x => updateFooBar(x))

или используя Dataframe:

import org.apache.spark.sql.functions.typedLit

val newDf = testRDD.toDF()
  .withColumn("foo",typedLit(Some(Math.random())))
  .withColumn("bar",typedLit(Some(Math.random())))
...