Сначала прочитайте csv
как фрейм данных pyspark.
from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext
sc = SparkContext(conf = conf)
sql = SQLContext(sc)
df = sql.read.csv("cal.csv", header=True, mode="DROPMALFORMED")
Затем запишите его в mongodb
,
df.write.format('com.mongodb.spark.sql.DefaultSource').mode('append')\
.option('database',NAME).option('collection',COLLECTION_MONGODB).save()
Укажите NAME
и COLLECTION_MONGODB
как созданные
Кроме того, вам необходимо предоставить conf и пакеты вместе с spark-submit в соответствии с вашей версией,
/bin/spark-submit --conf "spark.mongodb.inuri=mongodb://127.0.0.1/DATABASE.COLLECTION_NAME?readPreference=primaryPreferred"
--conf "spark.mongodb.output.uri=mongodb://127.0.0.1/DATABASE.COLLECTION_NAME"
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0
tester.py
Укажите COLLECTION_NAME
и DATABASE
выше.tester.py
при условии имени файла кода.Для получения дополнительной информации см. this .