Как выполнить действие только при поступлении нового непустого сообщения из темы KAFKA в структурированном потоке? - PullRequest
0 голосов
/ 11 июля 2019

У меня есть статический Dataframe (CSV-файл) и тема KAFKA (скажем, Topic1) в качестве входных данных для объединения их обоих и выполнения необходимых преобразований и обратной записи в другую тему.

Новая задача:будет дополнительной темой (Topic2), которая уведомит об изменениях в CSV-файле.всякий раз, когда есть сообщение от Topic2, я должен перечитывать статический Dataframe, присоединять их к сообщению Kafka (из Topic1).

Как обусловить чтение части Dataframe, только когда есть сообщение, пришедшее из Topic2?Пожалуйста, спросите меня, нужно ли мне быть более ясным.

Я не могу придумать ни одного DStream-эквивалента "foreachRDD {... rdd.count> 0 ..}" в структурированном потоке, так как queryStream вычисляется лениво,Поэтому я не могу выполнить никаких действий, чтобы определить, доставил ли Topic2 сообщение или нет.

Ниже приведен мой пример кода:

var staticDF = getLookupDataframe(spark,filePath)

val topic1DataStream =spark
        .readStream
        .format("kafka")
        .options(KAFKA_Topic1_Properties)
        .load()
        .select(from_json(col("value").cast("string"), schemaForJson).as("parseddata"))
        .select("parseddata.*")

val topic2DataStream =spark
        .readStream
        .format("kafka")
        .options(KAFKA_Topic2_Properties)
        .load()
        .select(from_json(col("value").cast("string"), schemaForJson).as("parseddata"))
        .select("parseddata.*")


if(/*condition to determine the Topic2 message arrival*/)
{
staticDF = getLookupDataframe(spark,filePath)
}

doSomeOperations(staticDF,topic1DataStream)




def getLookupDataframe(spark: SparkSession, path: String) :DataFrame = {
    spark
      .read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("delimiter", " ")
      .load(path)
      .toDF("col1", "col2", "col3")
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...