Мне нужно решить проблему.У меня есть производитель kafka, который производит сообщения (xmls) на определенную тему.Теперь это потоковые данные, и я использую потоковую передачу для получения этих сообщений из темы.Я пишу эти сообщения на консольный вывод.Теперь есть операция, которая должна быть выполнена с этими сообщениями (в основном это xmls), эти xmls должны быть проанализированы, и я написал для них тоже парсер.Теперь, когда я использую xml как жестко закодированный в строковую переменную, парсер работает нормально.Но мне нужно получить xmls из фрейма данных spark.Это то, что я сделал для получения сообщений от kafka.
val spark = SparkSession.builder
.appName("kafka-ingestion")
.master("local")
.config("spark.sql.orc.impl","native")
.getOrCreate()
import spark.implicits._
// Gets the xpath
//val xpaths = get_xpaths(filename)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "xmlpoc")
.option("encoding","UTF-8")
.option("startingOffsets", "earliest")
.load()
val df2 = df.selectExpr( "CAST(value AS STRING)")
df2.writeStream.format("console")
.outputMode("append")
.start().awaitTermination()
// Now perform parsing on each value of df2.
Когда я это делаю, я вижу все сообщения в кадре данных на консоли.Но я хочу выполнить операцию со всеми этими сообщениями (например, разбор здесь).Я понятия не имею, как это сделать.Я попробовал UDFS, но они добавляют еще один столбец, кроме моего значения столбца.Так что, похоже, ничего не работает.Любая помощь приветствуется.