Я могу сохранять данные из spark в Mysql, но не в MongoDB и Mysql одновременно. Может кто-нибудь сказать мне, как это сделать? Ниже я упомянул мой код для выравнивания JSON, def для сохранения в MongoDB и команды spark-submit.
Я пытаюсь сохранить необработанные данные Twitter в MongoDB. Кто-нибудь может мне помочь?
Код для выравнивания JSON для MongoDB:
def convertMongo(rdd):
try:
spark = getSparkSessionInstance(rdd.context.getConf())
df_json = spark.createDataFrame(rdd.map(lambda x: (_flatten_JSON(json.loads(x[1])))))
return df_json
except Exception as e:
print(str(e))
Код для сохранения в MongoDB:
def write_mongo(rdd):
try:
mongoDFRDD = convertMongo(rdd)
if(mongoDFRDD is not None):
mongoDFRDD.write.format('com.mongodb.spark.sql.DefaultSource').mode('append').option("hos$
except Exception as e:
print(str(e))
Spark Submit Команда Работает:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1,mysql:mysql-connector-java:5.1.45 --jars spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar MachineLearningOnStreamDataInSpark2.py
Команда Spark Submit с не включенным MongoDB --conf:
spark-submit --conf spark.logConf = true --conf "spark.mongodb.input.uri=mongodb://127.0.0.1:27017/Twitter.TwitterData?readPreference=primaryPreferred" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1:27017/Twitter.TwitterData" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 --jars spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar MachineLearningOnStreamDataInSpark2.py