Я использую Kakfa и Spark, мой выход (df1) - потоковый Dataframe, я бы хотел сохранить его в MongoDB.Какие-либо предложения?Большое спасибо!
val df= lines.selectExpr("CAST(value AS STRING)").as[(String)]
.select(from_json($"value", DFschema).as("data"))
.select("data.*")
.writeStream
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
df1 = df.filter($"COLUMN".isin(listA: _*))
// save df1 into MongoDB
//MongoSpark.save()...