У меня есть датафрейм, загруженный из JSON.Имеет 10 строк и 10 столбцов.Я должен выполнить приведенный ниже набор шагов для каждой строки.
- Call an API by extracting 4 columns from each row. The API will return corrected values for these 4 columns.
- Replace the returned value of these 4 columns with the original ones and keep other 6 in their original values.
- Write the updated dataframe in a JSON.
По отдельности эти три шага выполняются и вместе они достигаются, если у меня есть одна строка в кадре данных.Но я не могу выполнить один и тот же набор шагов для всех 10 строк моего информационного кадра.
Какой эффективный и точный способ сделать это?
Пример заглушки кода:
object Dummy {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("My App")
.master("local[*]").getOrCreate()
val df1 = spark.read.json("D:\\test.json")
// Assuming below xml is response of API
val xml = "xml response"
val xmlElem: Elem = scala.xml.XML.loadString(xml)
val rootTag = xmlElem \\ "rootTag"
val tag1 = rootTag \ "tag1"
val tag2 = rootTag \ "tag2"
val tag3 = rootTag \ "tag3"
val tag4 = rootTag \ "tag4"
import spark.implicits._
val dfx = Seq(
(tag1.text, tag2.text, tag3.text, tag4.text)).toDF("tag1", "tag2", "tag3", "tag4")
// Below step is used to replace tag1 to tag4 with original df and recreate a new df.
val rows1 = df1.rdd.zipWithIndex().map {
case (r: Row, tempCol: Long) => Row.fromSeq(tempCol +: r.toSeq)
}
val tempDF1 = spark.createDataFrame(rows1, StructType(StructField("tempCol", LongType, false) +: df1.schema.fields))
tempDF1.show()
val rows2 = dfx.rdd.zipWithIndex().map {
case (r: Row, tempCol: Long) => Row.fromSeq(tempCol +: r.toSeq)
}
val tempDF2 = spark.createDataFrame(rows2, StructType(StructField("tempCol", LongType, false) +: dfx.schema.fields))
tempDF2.show()
val final = tempDF1.drop("tag1", "tag2", "tag3", "tag4")
.join(tempDF2, "tempCol")
.drop("tempCol")
final.show()
}
}