Выполните набор операций в каждой строке кадра данных - PullRequest
1 голос
/ 28 сентября 2019

У меня есть датафрейм, загруженный из JSON.Имеет 10 строк и 10 столбцов.Я должен выполнить приведенный ниже набор шагов для каждой строки.

 - Call an API by extracting 4 columns from each row. The API will return corrected values for these 4 columns.
 - Replace the returned value of these 4 columns with the original ones and keep other 6 in their original values.
 - Write the updated dataframe in a JSON.

По отдельности эти три шага выполняются и вместе они достигаются, если у меня есть одна строка в кадре данных.Но я не могу выполнить один и тот же набор шагов для всех 10 строк моего информационного кадра.

Какой эффективный и точный способ сделать это?

Пример заглушки кода:

object Dummy {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("My App")
      .master("local[*]").getOrCreate()

    val df1 = spark.read.json("D:\\test.json")
    // Assuming below xml is response of API

    val xml = "xml response"

    val xmlElem: Elem = scala.xml.XML.loadString(xml)
    val rootTag = xmlElem \\ "rootTag"
    val tag1 = rootTag \ "tag1"
    val tag2 = rootTag \ "tag2"
    val tag3 = rootTag \ "tag3"
    val tag4 = rootTag \ "tag4"

    import spark.implicits._
    val dfx = Seq(
      (tag1.text, tag2.text, tag3.text, tag4.text)).toDF("tag1", "tag2", "tag3", "tag4")

    // Below step is used to replace tag1 to tag4 with original df and recreate a new df.

    val rows1 = df1.rdd.zipWithIndex().map {
      case (r: Row, tempCol: Long) => Row.fromSeq(tempCol +: r.toSeq)
    }
    val tempDF1 = spark.createDataFrame(rows1, StructType(StructField("tempCol", LongType, false) +: df1.schema.fields))

    tempDF1.show()
    val rows2 = dfx.rdd.zipWithIndex().map {
      case (r: Row, tempCol: Long) => Row.fromSeq(tempCol +: r.toSeq)
    }
    val tempDF2 = spark.createDataFrame(rows2, StructType(StructField("tempCol", LongType, false) +: dfx.schema.fields))

    tempDF2.show()
    val final = tempDF1.drop("tag1", "tag2", "tag3", "tag4")
      .join(tempDF2, "tempCol")
      .drop("tempCol")

    final.show()  
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...