Я получаю документ mongoDB, затем после обработки я хочу сохранить его в Hbase с использованием библиотеки Bson.Document
Изменение метода потоковой передачи с spark kafkastreaming на структурированную потоковую передачу. Поэтому более ранний метод с использованием kafkaUtils создавал Dstream [Document]
При структурированной потоковой передаче я получаю набор данных [Документ]
scala> val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers","brokerList").option("subscribe", s"topic_name").load().selectExpr("CAST(value AS STRING)")
stream: org.apache.spark.sql.DataFrame = [value: string]
scala> val strming_doc = stream.map(record => record.getAs[String]("value")
scala> org.apache.spark.sql.Dataset[String] = [value: string]
для дальнейших процессов мне нужно получить документ из набора данных
scala> val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers","brokerList").option("subscribe", s"topic_name").load().selectExpr("CAST(value AS STRING)")
stream: org.apache.spark.sql.DataFrame = [value: string]
scala> val strming_doc = stream.map(record => record.getAs[String]("value")
scala> org.apache.spark.sql.Dataset[String] = [value: string]
Мне нужно получить документиз набора данных, в основном, чтобы получить данные из mongoDB