Как применить схему Spark к запросу на основе имени Kafka topi c в структурированной потоковой передаче Spark? - PullRequest
0 голосов
/ 29 апреля 2020

У меня есть задание Spark Structured Streaming, которое направляет данные из нескольких тем Kafka на основе subscribePattern, и для каждой топики Kafka c У меня есть схема Spark. При потоковой передаче данных из Kafka я хочу применить схему Spark к сообщению Kafka на основе имени topi c.

Учтите, у меня есть две темы: cust & клиенты .

Потоковая передача данных из Kafka на основе subscribePattern (Java строка регулярного выражения):

var df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "cust*")
  .option("startingOffsets", "earliest") 
  .load()
  .withColumn("value", $"value".cast("string"))
  .filter($"value".isNotNull)

вышеуказанный потоковый запрос направляет данные из обеих тем.

Допустим, у меня есть две схемы Spark, по одной для каждой топи c:

var cust: StructType = new StructType()
    .add("name", StringType)
    .add("age", IntegerType)

var customers: StructType = new StructType()
    .add("id", IntegerType)
    .add("first_name", StringType)
    .add("last_name", StringType)
    .add("email", StringType)
    .add("address", StringType)

Теперь я хочу применить схему Spark на основе имени топи c и сделать это Я написал udf, который читает имя topi c и возвращает схему в формате DDL:

val schema = udf((table: String) => (table) match {
    case ("cust")      => cust.toDDL
    case ("customers") => customers.toDDL
    case _             => new StructType().toDDL
  })

Затем я использую udf (я понимаю, что udf применяется к каждому столбцу) внутри from_ json метод, подобный этому:

val query = df
    .withColumn("topic", $"topic".cast("string"))
    .withColumn("data", from_json($"value", schema($"topic")))
    .select($"key", $"topic", $"data.*")
    .writeStream.outputMode("append")
    .format("console")
    .start()
    .awaitTermination()

Это дает мне следующее исключение, которое является правильным, потому что from_ json ожидает строковую схему в формате DDL или StructType.

org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of UDF(topic);

Я хочу знать, как выполнить sh это?

Любая помощь будет оценена!

1 Ответ

2 голосов
/ 30 апреля 2020

То, что вы делаете, невозможно. Ваш query df не может иметь 2 разные схемы.

Я могу придумать 2 способа сделать это:

  1. Разделить ваш df на topi c, затем применить ваши 2 схемы к 2 dfs (cust и customers)
  2. Объединить 2 схемы в 1 схему и применить это ко всем темам.
...