У меня есть задание Spark Structured Streaming, которое направляет данные из нескольких тем Kafka на основе subscribePattern
, и для каждой топики Kafka c У меня есть схема Spark. При потоковой передаче данных из Kafka я хочу применить схему Spark к сообщению Kafka на основе имени topi c.
Учтите, у меня есть две темы: cust & клиенты .
Потоковая передача данных из Kafka на основе subscribePattern
(Java строка регулярного выражения):
var df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "cust*")
.option("startingOffsets", "earliest")
.load()
.withColumn("value", $"value".cast("string"))
.filter($"value".isNotNull)
вышеуказанный потоковый запрос направляет данные из обеих тем.
Допустим, у меня есть две схемы Spark, по одной для каждой топи c:
var cust: StructType = new StructType()
.add("name", StringType)
.add("age", IntegerType)
var customers: StructType = new StructType()
.add("id", IntegerType)
.add("first_name", StringType)
.add("last_name", StringType)
.add("email", StringType)
.add("address", StringType)
Теперь я хочу применить схему Spark на основе имени топи c и сделать это Я написал udf, который читает имя topi c и возвращает схему в формате DDL:
val schema = udf((table: String) => (table) match {
case ("cust") => cust.toDDL
case ("customers") => customers.toDDL
case _ => new StructType().toDDL
})
Затем я использую udf (я понимаю, что udf применяется к каждому столбцу) внутри from_ json метод, подобный этому:
val query = df
.withColumn("topic", $"topic".cast("string"))
.withColumn("data", from_json($"value", schema($"topic")))
.select($"key", $"topic", $"data.*")
.writeStream.outputMode("append")
.format("console")
.start()
.awaitTermination()
Это дает мне следующее исключение, которое является правильным, потому что from_ json ожидает строковую схему в формате DDL или StructType.
org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of UDF(topic);
Я хочу знать, как выполнить sh это?
Любая помощь будет оценена!