Как обрабатывать документы JSON (из MongoDB) и записывать в HBase в структурированном потоке? - PullRequest
2 голосов
/ 08 ноября 2019

Я получаю документ mongoDB, затем после обработки я хочу сохранить его в Hbase с использованием библиотеки Bson.Document

Изменение метода потоковой передачи с spark kafkastreaming на структурированную потоковую передачу. Поэтому более ранний метод с использованием kafkaUtils создавал Dstream [Document]

При структурированной потоковой передаче я получаю набор данных [Документ]

scala> val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers","brokerList").option("subscribe", s"topic_name").load().selectExpr("CAST(value AS STRING)")
stream: org.apache.spark.sql.DataFrame = [value: string]

scala> val strming_doc = stream.map(record => record.getAs[String]("value")
scala> org.apache.spark.sql.Dataset[String] = [value: string]

для дальнейших процессов мне нужно получить документ из набора данных

scala> val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers","brokerList").option("subscribe", s"topic_name").load().selectExpr("CAST(value AS STRING)")
stream: org.apache.spark.sql.DataFrame = [value: string]

scala> val strming_doc = stream.map(record => record.getAs[String]("value")
scala> org.apache.spark.sql.Dataset[String] = [value: string]

Мне нужно получить документиз набора данных, в основном, чтобы получить данные из mongoDB

1 Ответ

0 голосов
/ 11 ноября 2019

Кажется, вам нужны операторы foreach или foreachBatch для записи результата потокового запроса в HBase. Пожалуйста, обратитесь к Использование Foreach и ForeachBatch в официальной документации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...