Я читаю поток данных из темы kafka, используя структурированную потоковую передачу в режиме обновления., А затем выполняю некоторые преобразования.
Затем я создал приемник jdbc для отправки данных в приемник mysql в режиме добавления.Проблема заключается в том, как сказать моему приемнику, чтобы он знал, что это мой первичный ключ, и выполнить обновление на его основе, чтобы в моей таблице не было повторяющихся строк.
val df: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<List-here>")
.option("subscribe", "emp-topic")
.load()
import spark.implicits._
// value in kafka is bytes so cast it to String
val empList: Dataset[Employee] = df.
selectExpr("CAST(value AS STRING)")
.map(row => Employee(row.getString(0)))
// window aggregations on 1 min windows
val aggregatedDf= ......
// How to tell here that id is my primary key and do the update
// based on id column
aggregatedDf
.writeStream
.trigger(Trigger.ProcessingTime(60.seconds))
.outputMode(OutputMode.Update)
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF
.select("id", "name","salary","dept")
.write.format("jdbc")
.option("url", "jdbc:mysql://localhost/empDb")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("dbtable", "empDf")
.option("user", "root")
.option("password", "root")
.mode(SaveMode.Append)
.save()
}