У меня есть сценарий использования, в котором я использую данные из Кафки с использованием потоковой структурированной искры.У меня есть несколько тем для подписки, и на основе имени topic датафрейм должен быть выгружен в определенное место (другое место для разных тем).Я видел, может ли это быть решено с помощью какой-либо функции разделения / фильтрации в кадре данных spark, но не смог найти ни одной.
На данный момент у меня есть подписка только на одну тему, и я использую свой собственный письменный метод для выгрузки данных в место в формате паркета.Вот код, который я сейчас использую:
def save_as_parquet(cast_dataframe: DataFrame,output_path:
String,checkpointLocation: String): Unit = {
val query = cast_dataframe.writeStream
.format("parquet")
.option("failOnDataLoss",true)
.option("path",output_path)
.option("checkpointLocation",checkpointLocation)
.start()
.awaitTermination()
}
Когда я буду подписан на разные темы, этот cast_dataframe также будет иметь значения из разных тем.Я хочу сбросить данные из темы только в то место, где ей назначено местоположение.Как это можно сделать?