Как определить происхождение сообщений в искровой структурированной потоковой передаче с кафкой в ​​качестве источника? - PullRequest
0 голосов
/ 24 июня 2019

У меня есть сценарий использования, в котором я должен подписаться на несколько тем в kafka в структурированной потоковой передаче .Затем я должен разобрать каждое сообщение и сформировать из него таблицу дельты озера.Я сделал синтаксический анализатор и сообщения (в форме XML) правильно разбора и формирования таблицы дельта-озера.Однако на данный момент я подписываюсь только на одну тему.Я хочу подписаться на несколько тем и, исходя из темы, следует обратиться к парсеру, специально созданному для этой конкретной темы.Поэтому в основном я хочу определить имя темы для всех сообщений по мере их обработки, чтобы я мог отправить их нужному анализатору и продолжить обработку.

Вот так я получаю доступ к сообщениям из разных тем.Тем не менее, я понятия не имею, как определить источник входящих сообщений при их обработке.

 val stream_dataframe = spark.readStream
  .format(ConfigSetting.getString("source"))
  .option("kafka.bootstrap.servers", ConfigSetting.getString("bootstrap_servers"))
  .option("kafka.ssl.truststore.location", ConfigSetting.getString("trustfile_location"))
  .option("kafka.ssl.truststore.password", ConfigSetting.getString("truststore_password"))
  .option("kafka.sasl.mechanism", ConfigSetting.getString("sasl_mechanism"))
  .option("kafka.security.protocol", ConfigSetting.getString("kafka_security_protocol"))
  .option("kafka.sasl.jaas.config",ConfigSetting.getString("jass_config"))
  .option("encoding",ConfigSetting.getString("encoding"))
  .option("startingOffsets",ConfigSetting.getString("starting_offset_duration"))
  .option("subscribe",ConfigSetting.getString("topics_name"))
  .option("failOnDataLoss",ConfigSetting.getString("fail_on_dataloss")) 
  .load()


 var cast_dataframe = stream_dataframe.select(col("value").cast(StringType))

 cast_dataframe =  cast_dataframe.withColumn("parsed_column",parser(col("value"))) // Parser is the udf, made to parse the xml from the topic. 

Как определить имя темы сообщений при обработке в потоковой передаче с искрой?

1 Ответ

1 голос
/ 24 июня 2019

Согласно официальной документации (выделено мое)

Каждая строка в источнике имеет следующую схему:

Тип столбца


бинарный ключ

двоичное значение

строка темы
раздел int

...

Как видите, входная тема является частью выходной схемы и доступна без каких-либо специальных действий.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...