Сохранение данных из Spark в MongoDB и Mysql - PullRequest
0 голосов
/ 30 апреля 2018

Я могу сохранять данные из spark в Mysql, но не в MongoDB и Mysql одновременно. Может кто-нибудь сказать мне, как это сделать? Ниже я упомянул мой код для выравнивания JSON, def для сохранения в MongoDB и команды spark-submit.

Я пытаюсь сохранить необработанные данные Twitter в MongoDB. Кто-нибудь может мне помочь?

Код для выравнивания JSON для MongoDB:

    def convertMongo(rdd):
    try:
        spark = getSparkSessionInstance(rdd.context.getConf())
        df_json =  spark.createDataFrame(rdd.map(lambda x: (_flatten_JSON(json.loads(x[1])))))
        return df_json
    except Exception as e:
        print(str(e))

Код для сохранения в MongoDB:

def write_mongo(rdd):
    try:
        mongoDFRDD = convertMongo(rdd)
        if(mongoDFRDD is not None):
            mongoDFRDD.write.format('com.mongodb.spark.sql.DefaultSource').mode('append').option("hos$
    except Exception as e: 
        print(str(e))

Spark Submit Команда Работает:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1,mysql:mysql-connector-java:5.1.45 --jars spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar MachineLearningOnStreamDataInSpark2.py

Команда Spark Submit с не включенным MongoDB --conf:

spark-submit --conf spark.logConf = true --conf "spark.mongodb.input.uri=mongodb://127.0.0.1:27017/Twitter.TwitterData?readPreference=primaryPreferred" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1:27017/Twitter.TwitterData" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 --jars spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar MachineLearningOnStreamDataInSpark2.py

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...