Spark Structured Streaming с интеграцией реестра схем для сообщений на основе Avro - PullRequest
0 голосов
/ 18 апреля 2020

У нас есть сценарий использования, в котором мы пытаемся использовать несколько тем Kafka ( AVRO messages ), интегрирующихся с реестром схемы. Мы используем Spark Structured Streaming ( Версия Spark: 2.4.4 ), Confluent Kafka (Версия библиотеки: 5.4.1) для того же:

val kafkaDF: DataFrame = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap.server>:9092")
  .option("subscribe", [List of Topics]) // multi topic subscribe
  .load()

Мы выбираем значение из выше DF и с использованием схемы для десериализации сообщения AVRO

val tableDF = kafkaDF.select(from_avro(col("value"), jsonSchema).as("table")).select("*")

Дорожный блок здесь, поскольку мы используем несколько тем Kafka, мы интегрировали всю нашу схему JSON в MAP с ключ представляет собой topi c name и значения представляют собой соответствующую схему . Как мы можем сделать поиск, используя ту же карту в списке выше? мы пробовали UDF, но он возвращает тип "col", но jsonSchema должен иметь тип String. Кроме того, схема различна для разных тем

Пара добавленных вопросов:

  1. Является ли вышеуказанный подход правильным для одновременного использования нескольких тем?
  2. Следует ли мы используем одного потребителя для каждой топи c?
  3. Если у нас есть больше тем, как мы можем достичь параллельной обработки топи c, потому что последовательная может занять значительное количество времени.

1 Ответ

1 голос
/ 19 апреля 2020

Не проверяя все это, вы, кажется, в порядке с основами from_avro, et c. from_ json, et c.

  1. https://sparkbyexamples.com/spark/spark-streaming-consume-and-produce-kafka-messages-in-avro-format/ может помочь вам в первой части. Это тоже очень хорошо https://www.waitingforcode.com/apache-spark-structured-streaming/two-topics-two-schemas-one-subscription-apache-spark-structured-streaming/read#filter_topic.

  2. Я бы сделал таблицу. *

  3. Несколько, несколько схем -> читать несколько таких версий из .avs c или кодируй сам.

  4. Для использования нескольких тем в приложении Spark Streaming вопрос состоит в том, сколько в приложении? Нет строгих правил, кроме очевидных, таких как большое потребление против меньшего потребления и, если порядок важен. Ресурсы исполнителя могут быть освобождены.

  5. Затем вам нужно обрабатывать все темы отдельно, как это imho - Filter on Topic - вы можете заполнить детали, так как я немного срочно - используя foreachBatch парадигма.

  6. Не уверен, как вы записываете данные в состоянии покоя, но вопрос не в этом.

Похоже затем обработать несколько тем:

...
... // Need to get Topic first
stream.toDS()
      .select($"topic", $"value")
      .writeStream
      .foreachBatch((dataset, _) => {
         dataset.persist() // need this

         dataset.filter($"topic" === topicA)
                .select(from_avro(col("value"), jsonSchemaTA)
                .as("tA"))
                .select("tA.*")
                .write.format(...).save(...)

         dataset.filter($"topic" === topicB)
                .select(from_avro(col("value"), jsonSchemaTB)
                .as("tB"))
                .select("tB.*")
                .write.format(...).save(...)
          ...
          ...
         dataset.unpersist()
         ...
         })

         .start().awaitTermination()

, но смешайте с этим превосходным ответом: Интеграция потоковой передачи Spark с реестром схемы Confluent

...