Как я могу сохранить Kafka для чтения структурированных потоковых данных как Dataframe и применить к нему синтаксический анализ? - PullRequest
0 голосов
/ 03 апреля 2019

Я пытаюсь читать потоковые данные в режиме реального времени из тем Kafka через структурированную потоковую передачу Spark. Однако, насколько я понимаю, мне понадобится остановить потоковую передачу когда-нибудь, чтобы я мог применить к ней свою логику синтаксического анализа и передать ее в MongoDB.Есть ли способ сохранить потоковые данные в отдельный фрейм данных с / без остановки потоковой передачи?

Я проверил руководство и другие блоги и не получил прямого ответа на мое требование

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:9092, host:9092, host:9092")
.option("subscribe", "TOPIC_P2_R2, TOPIC_WITH_COMP_P2_R2.DIT, TOPIC_WITHOUT_COMP_P2_R2.DIT")
.option("startingOffsets", "earliest")
.load()

val dfs = df.selectExpr("CAST(value AS STRING)")

val consoleOutput = dfs.writeStream
.outputMode("append")
.format("console")
.start()
consoleOutput.awaitTermination()
consoleOutput.stop()

Мне нужно, чтобы потоковые данные были каким-то образом сохранены в кадре данных либо путем остановки потоковой передачи, либо без остановки

Ниже приведена логика синтаксического анализа, которая у меня есть, и вместо выбора набора данных из нужного мне пути к файлупотоковые данные должны быть моим новым набором данных и должны быть в состоянии применить мою остальную логику и получить вывод.Сохранение его в Mongo сейчас не моя основная задача;

val log = spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("C:\\Users\\raheem_mohammed\\IdeaProjects\\diag.csv")
log.createOrReplaceTempView("logs")
val df = spark.sql("select _raw, _time from logs").toDF

//Adds Id number to each of the event
val logs = dfs.withColumn("Id", monotonicallyIncreasingId()+1)
//Register Dataframe as a temp table
logs.createOrReplaceTempView("logs")

val  = spark.sql("select Id, value from logs")


//Extracts columns from _raw column. Also finds the probabilities of compositeNames.
//If true then the compositeName belongs to one of the four possibilities
val extractedDF = dfss.withColumn("managed_server", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\]",2))
  .withColumn("alert_summary", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\]",3))
  .withColumn("oracle_details", regexp_extract($"_raw", "\\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\] \\[(.*?)\\]",5))
  .withColumn("ecid", regexp_extract($"_raw", "(?<=ecid: )(.*?)(?=,)",1))
  //.withColumn("CompName",regexp_extract($"_raw",""".*(composite_name|compositename|composites|componentDN):\s+(\S+)\]""",2))
  .withColumn("CompName",regexp_extract($"_raw",""".*(composite_name|compositename|composites|componentDN):\s+([a-zA-Z]+)""",2))
  .withColumn("composite_name", col("_raw").contains("composite_name"))
  .withColumn("compositename", col("_raw").contains("compositename"))
  .withColumn("composites", col("_raw").contains("composites"))
  .withColumn("componentDN", col("_raw").contains("componentDN"))

//Filters out any NULL values if found
val finalData = extractedDF.filter(
  col("managed_server").isNotNull &&
    col("alert_summary").isNotNull &&
    col("oracle_details").isNotNull &&
    col("ecid").isNotNull &&
    col("CompName").isNotNull &&
    col("composite_name").isNotNull &&
    col("compositename").isNotNull &&
    col("composites").isNotNull &&
    col("componentDN").isNotNull)

    finalData.show(false)
...