У нас есть сценарий использования, в котором мы пытаемся использовать несколько тем Kafka ( AVRO messages ), интегрирующихся с реестром схемы. Мы используем Spark Structured Streaming ( Версия Spark: 2.4.4 ), Confluent Kafka (Версия библиотеки: 5.4.1) для того же:
val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap.server>:9092")
.option("subscribe", [List of Topics]) // multi topic subscribe
.load()
Мы выбираем значение из выше DF и с использованием схемы для десериализации сообщения AVRO
val tableDF = kafkaDF.select(from_avro(col("value"), jsonSchema).as("table")).select("*")
Дорожный блок здесь, поскольку мы используем несколько тем Kafka, мы интегрировали всю нашу схему JSON в MAP с ключ представляет собой topi c name и значения представляют собой соответствующую схему . Как мы можем сделать поиск, используя ту же карту в списке выше? мы пробовали UDF, но он возвращает тип "col", но jsonSchema должен иметь тип String. Кроме того, схема различна для разных тем
Пара добавленных вопросов:
- Является ли вышеуказанный подход правильным для одновременного использования нескольких тем?
- Следует ли мы используем одного потребителя для каждой топи c?
- Если у нас есть больше тем, как мы можем достичь параллельной обработки топи c, потому что последовательная может занять значительное количество времени.