У меня есть CSV-файл, в котором есть столбцы, и для целей тестирования я помещаю его вручную в Kafka, а оттуда я читаю его в Spark и применяю некоторый анализ, и я делаю вывод на консоль для целей тестирования. Теперь я понимаю, что данные CSV передаются в виде значения в структурированном потоке, для которого я преобразовал их в строку. Мое требование, если я могу преобразовать данные значения в фактические столбцы. В CSV-файле есть сотни столбцов, но я смотрю только на два конкретных столбца "SERVICE_NAME8" и "_raw"
Я использую spark.sql для извлечения этих столбцов, когда я читаю CSV-файл из пути, но теперь я использую структурированную потоковую передачу. Я не уверен, смогу ли я извлечь эти конкретные столбцы в качестве нового кадра данных и применить мой анализ после этого
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.160.172.45:9092, 10.160.172.46:9092, 10.160.172.100:9092")
.option("subscribe", "TOPIC_WITH_COMP_P2_R2, TOPIC_WITH_COMP_P2_R2.DIT, TOPIC_WITHOUT_COMP_P2_R2.DIT")
.load()
val dfs = df.selectExpr("CAST(value AS STRING)").toDF()
val data =dfs.withColumn("splitted", split($"value", "/"))
.select($"splitted".getItem(4).alias("region"),$"splitted".getItem(5).alias("service"),col("value"))
.withColumn("service_type", regexp_extract($"service", """.*(Inbound|Outbound|Outound).*""",1))
.withColumn("region_type", concat(
when(col("region").isNotNull,col("region")).otherwise(lit("null")), lit(" "),
when(col("service").isNotNull,col("service_type")).otherwise(lit("null"))))
val extractedDF = data.filter(
col("region").isNotNull &&
col("service").isNotNull &&
col("value").isNotNull &&
col("service_type").isNotNull &&
col("region_type").isNotNull)
.filter("region != ''")
.filter("service != ''")
.filter("value != ''")
.filter("service_type != ''")
.filter("region_type != ''")
val query = extractedDF
.writeStream
.format("console")
.outputMode("append")
.trigger(ProcessingTime("20 seconds"))
.start()
После val dfs = df.selectExpr ("CAST (value AS STRING)"). ToDF () мне как-то нужно извлечь только два столбца "SERVICE_NAME8" & "_raw", а синтаксический анализ должен сделать все остальное и произвести вывод