Я пытаюсь прочитать файл из корзины s3 строка за строкой, преобразовываю df в RDD и передаю эти значения в переменные.Теперь я передаю эти переменные в качестве параметров в мой SQL-запрос.
Мой код:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.functions._
object FileTest {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local[*]").appName("FileTest").getOrCreate()
val sc = spark.sparkContext
val conf = new SparkConf().setAppName("FileTest").setMaster("local[*]")
val sqlContext = spark.sqlContext
val fileread = spark.read.format("csv").option("inferSchhema","true").option("header","false").load("C:\\data\\FileParam.txt")
fileread.createOrReplaceTempView("Tempdata")
val filedata = spark.sql("select * from tempdata")
filedata.createOrReplaceTempView("filedata")
val rdd1=filedata.rdd
rdd1.collect().foreach{
x=>
println(s"""select * from temp where id=${x(0)} and cid=${x(1)}, path1=${x(2)},path2=${x(3)}""")
val id = x(0)
val cid = x(1)
val path1 = x(2)
val path2= x(3)
println(s""" values are ${id},${cid},${path1},${path2}""")
val resfile = spark.sql(s"select * from filedata where _c1= ${cid}")
val fileOutput = filedata.withColumn("x4", typedLit("OK")).write.format("csv").mode("append").save("s3a://test-bucket/TestFile/Test_result/Result.txt")
}
}
spark.stop()
}
}
Мой вопрос здесь, к файлу FileParam.txt каждая строка будет добавляться каждый раз, и я хочу прочитать данные только самой последней, а не всех записейв цикле.
Мой файл выглядит следующим образом:
FileParam.txt:
1,1001,s3a://test-bucket/folder1,s3a://test-bucket/folder2
2,1002,s3a://test-bucket/folder3,s3a://test-bucket/folder4
Предположим, что мой процесс прочитал все вышеуказанные записи, и еще одна новая записьПосле этого я должен прочитать только последнюю запись, а не все 3 записи.
Пожалуйста, помогите мне, как решить эту проблему.