Загрузите / импортируйте CSV-файл в mongodb, используя PYSPARK - PullRequest
0 голосов
/ 28 сентября 2018

Я хочу знать, как загрузить / импортировать CSV-файл в mongodb с помощью pyspark.У меня есть CSV-файл с именем cal.csv, размещенный на рабочем столе.Может кто-нибудь поделиться фрагментом кода.

Ответы [ 2 ]

0 голосов
/ 03 октября 2018

Это сработало для меня.база данных: люди Коллекция: con

pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/people.con?readPreference=primaryPreferred" \
    --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/people.con" \
    --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0


from pyspark.sql import SparkSession

my_spark = SparkSession \
         .builder \
         .appName("myApp") \
         .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/people.con") \
         .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/people.con") \
         .getOrCreate()

df = spark.read.csv(path = "file:///home/user/Desktop/people.csv", header=True, inferSchema=True)

df.printSchema()

df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("database","people").option("collection", "con").save()

Далее перейдите к Монго и проверьте, записана ли коллекция, выполнив следующие шаги

mongo
show dbs
use people
show collections
db.con.find().pretty()
0 голосов
/ 01 октября 2018

Сначала прочитайте csv как фрейм данных pyspark.

from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext

sc = SparkContext(conf = conf)
sql = SQLContext(sc)

df = sql.read.csv("cal.csv", header=True, mode="DROPMALFORMED")

Затем запишите его в mongodb,

df.write.format('com.mongodb.spark.sql.DefaultSource').mode('append')\
        .option('database',NAME).option('collection',COLLECTION_MONGODB).save()

Укажите NAME и COLLECTION_MONGODB как созданные

Кроме того, вам необходимо предоставить conf и пакеты вместе с spark-submit в соответствии с вашей версией,

/bin/spark-submit --conf "spark.mongodb.inuri=mongodb://127.0.0.1/DATABASE.COLLECTION_NAME?readPreference=primaryPreferred"
                  --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/DATABASE.COLLECTION_NAME" 
                  --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0
                  tester.py

Укажите COLLECTION_NAME и DATABASE выше.tester.py при условии имени файла кода.Для получения дополнительной информации см. this .

...