У меня есть статический Dataframe (CSV-файл) и тема KAFKA (скажем, Topic1) в качестве входных данных для объединения их обоих и выполнения необходимых преобразований и обратной записи в другую тему.
Новая задача:будет дополнительной темой (Topic2), которая уведомит об изменениях в CSV-файле.всякий раз, когда есть сообщение от Topic2, я должен перечитывать статический Dataframe, присоединять их к сообщению Kafka (из Topic1).
Как обусловить чтение части Dataframe, только когда есть сообщение, пришедшее из Topic2?Пожалуйста, спросите меня, нужно ли мне быть более ясным.
Я не могу придумать ни одного DStream-эквивалента "foreachRDD {... rdd.count> 0 ..}" в структурированном потоке, так как queryStream вычисляется лениво,Поэтому я не могу выполнить никаких действий, чтобы определить, доставил ли Topic2 сообщение или нет.
Ниже приведен мой пример кода:
var staticDF = getLookupDataframe(spark,filePath)
val topic1DataStream =spark
.readStream
.format("kafka")
.options(KAFKA_Topic1_Properties)
.load()
.select(from_json(col("value").cast("string"), schemaForJson).as("parseddata"))
.select("parseddata.*")
val topic2DataStream =spark
.readStream
.format("kafka")
.options(KAFKA_Topic2_Properties)
.load()
.select(from_json(col("value").cast("string"), schemaForJson).as("parseddata"))
.select("parseddata.*")
if(/*condition to determine the Topic2 message arrival*/)
{
staticDF = getLookupDataframe(spark,filePath)
}
doSomeOperations(staticDF,topic1DataStream)
def getLookupDataframe(spark: SparkSession, path: String) :DataFrame = {
spark
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", " ")
.load(path)
.toDF("col1", "col2", "col3")
}