Написать BsonDocument в MongoDB Spark Scala - PullRequest
0 голосов
/ 27 мая 2018

Я хочу сохранить данные в MongoDB при потоковой передаче из Twitter.Каждый RDD в DStream содержит Array [String] со значениями, поэтому я установил ключи для этих значений и поместил их в org.bson.document.Когда я пытаюсь записать Seq of Documents в MongoDB, я получаю такое исключение:

ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 9)
java.lang.IllegalArgumentException: clusterListener can not be null

Я использовал соединитель Spark MongoDB, поэтому вот зависимости в моем файле build.sbt:

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "1.1.0",
  "org.apache.bahir" %% "spark-streaming-twitter" % sparkVersion,
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
  "com.typesafe" % "config" % "1.3.0",
  "org.twitter4j" % "twitter4j-core" % "4.0.6",
  "org.twitter4j" % "twitter4j-stream" % "4.0.6",
  "com.twitter" %% "bijection-avro" % "0.9.6",
  "org.mongodb.spark" %% "mongo-spark-connector" % "2.2.2",
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.0",
  "org.json4s" %% "json4s-native" % "3.5.3"
)

Кроме того, я использовал образ докера MongoDB в своем файле docker-compose:

version: '3.3'
services:
  kafka:
      image: spotify/kafka
      ports:
        - "9092:9092"
      environment:
      - ADVERTISED_HOST=localhost
  mongo:
      image: mongo
      restart: always
      environment:
        MONGO_INITDB_ROOT_USERNAME: admin
        MONGO_INITDB_ROOT_PASSWORD: pwd
  mongo-express:
      image: mongo-express
      restart: always
      ports:
        - 8081:8081
      environment:
        ME_CONFIG_MONGODB_ADMINUSERNAME: admin
        ME_CONFIG_MONGODB_ADMINPASSWORD: pwd

Это код для потоковой передачи и записи в базу данных.WordArrays здесь имеет тип DStream [Array [String]]

wordsArrays.foreachRDD(rdd => rdd.collect.foreach(
        record => {
          val docs = sparkContext.parallelize(Seq(new Document("tweetId", record(0)),
            new Document("text", record(1)),
            new Document("favoriteCount", record(1)),
            new Document("retweetCount", record(1)),
            new Document("geoLocation", record(1)),
            new Document("language", record(1)),
            new Document("createdAt", record(1))
          ))
          MongoSpark.save(docs)
        }
    ))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...