Я пытаюсь читать потоковые данные в режиме реального времени из тем Kafka через структурированную потоковую передачу Spark. Однако, насколько я понимаю, мне понадобится остановить потоковую передачу когда-нибудь, чтобы я мог применить к ней свою логику синтаксического анализа и передать ее в MongoDB.Есть ли способ сохранить потоковые данные в отдельный фрейм данных с / без остановки потоковой передачи?
Я проверил руководство и другие блоги и не получил прямого ответа на мое требование
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:9092, host:9092, host:9092")
.option("subscribe", "TOPIC_P2_R2, TOPIC_WITH_COMP_P2_R2.DIT, TOPIC_WITHOUT_COMP_P2_R2.DIT")
.option("startingOffsets", "earliest")
.load()
val dfs = df.selectExpr("CAST(value AS STRING)")
val consoleOutput = dfs.writeStream
.outputMode("append")
.format("console")
.start()
consoleOutput.awaitTermination()
consoleOutput.stop()
Мне нужно, чтобы потоковые данные были каким-то образом сохранены в кадре данных либо путем остановки потоковой передачи, либо без остановки
Ниже приведена логика синтаксического анализа, которая у меня есть, и вместо выбора набора данных из нужного мне пути к файлупотоковые данные должны быть моим новым набором данных и должны быть в состоянии применить мою остальную логику и получить вывод.Сохранение его в Mongo сейчас не моя основная задача;
val log = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("C:\\Users\\raheem_mohammed\\IdeaProjects\\diag.csv")
log.createOrReplaceTempView("logs")
val df = spark.sql("select _raw, _time from logs").toDF
//Adds Id number to each of the event
val logs = dfs.withColumn("Id", monotonicallyIncreasingId()+1)
//Register Dataframe as a temp table
logs.createOrReplaceTempView("logs")
val = spark.sql("select Id, value from logs")
//Extracts columns from _raw column. Also finds the probabilities of compositeNames.
//If true then the compositeName belongs to one of the four possibilities
val extractedDF = dfss.withColumn("managed_server", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\]",2))
.withColumn("alert_summary", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\]",3))
.withColumn("oracle_details", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\]",5))
.withColumn("ecid", regexp_extract($"_raw", "(?<=ecid: )(.*?)(?=,)",1))
//.withColumn("CompName",regexp_extract($"_raw",""".*(composite_name|compositename|composites|componentDN):\s+(\S+)\]""",2))
.withColumn("CompName",regexp_extract($"_raw",""".*(composite_name|compositename|composites|componentDN):\s+([a-zA-Z]+)""",2))
.withColumn("composite_name", col("_raw").contains("composite_name"))
.withColumn("compositename", col("_raw").contains("compositename"))
.withColumn("composites", col("_raw").contains("composites"))
.withColumn("componentDN", col("_raw").contains("componentDN"))
//Filters out any NULL values if found
val finalData = extractedDF.filter(
col("managed_server").isNotNull &&
col("alert_summary").isNotNull &&
col("oracle_details").isNotNull &&
col("ecid").isNotNull &&
col("CompName").isNotNull &&
col("composite_name").isNotNull &&
col("compositename").isNotNull &&
col("composites").isNotNull &&
col("componentDN").isNotNull)
finalData.show(false)