Чем отличается pyspark.sql от pymongo? - PullRequest
0 голосов
/ 30 марта 2019

PySpark может обрабатывать потоковые данные как RDD от kafka.Затем я хочу сохранить обработанный результат в mongoDB.Два метода найдены и сработали.Чем они отличаются?

Метод 1: Использование pyspark.sql коннектора Метод 2: Использование pymongo

from pyspark.sql import SQLContext
from pyspark.sql.types import *

def method1(time, rdd):
    df = ctx.createDataFrame(rdd, ["word", "count"])
    df.write.format("com.mongodb.spark.sql").options( \
        uri="mongodb://127.0.0.1:27017",  \
        database="pyspark-pymongo", \
        collection="data").mode("append").save()

from pymongo import MongoClient
client = MongoClient("mongodb://127.0.0.1:27017")

def method2(time, rdd):
    doc = map(lambda d: {'name': d[0], 'count': d[1]}, rdd.collect())
    db = client['pyspark-pymongo']['data']
    db.insert(doc)

counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a+b)
counts.foreachRDD(method1)
counts.foreachRDD(method2)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...