Лучший способ получить доступ к Mongodb внутри искрового работника - PullRequest
0 голосов
/ 26 декабря 2018

Итак, я соединяю кафку с искрой и хочу найти это значение в моноблоке.Я не знаю, какой подход использовать внутри spark-клиента, использовать клиент mongdb или обращаться к mongo с сеансом spark.

Используя pyspark, я попытался обработать, что получить от kafka (объект json) внутри foreachrddfunc и попытался найти несколько значений в dong монго.

Если я использую сеанс искры: - Я должен каждый раз создавать сеанс.Если я использовал постоянный метод вне рабочего, сохранится ли он среди всех рабочих?- Это быстрее, чем при использовании обычного клиентского подключения Python Mongo?

Это лучший способ получить сообщение от kafka и обработать его в spark?

sc = SparkSession.builder.appName("test_kafka_app") \
    .config("spark.mongodb.input.uri", MONGO_URL) \
    .config("spark.mongodb.output.uri", MONGO_URL) \
    .getOrCreate()
ww = sc.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",
                                                                  MONGO_URL).load()
ww.persist()
def test(value):
    try:
        sc = SparkSession.builder.appName("test_kafka_app") \
            .config("spark.mongodb.input.uri", MONGO_URL) \
            .config("spark.mongodb.output.uri", MONGO_URL) \
            .getOrCreate()
        ww = sc.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",
                                                                        MONGO_URL).load()
        data = json.loads(value[1])
        if "val1" in data:
                print(ww.filter(func.array_contains(
                    ww.data, data["val1"])).collect())
    except Exception as ee:
        print(ee)
ssc = StreamingContext(sc.sparkContext, 2)
brokers = "localhost:9092"  # sys.argv[1:]
topic = "new_topic"
kvs = KafkaUtils.createDirectStream(ssc,
                                    [topic],
                                    {
                                        "metadata.broker.list": brokers
                                    })
lines = kvs.flatMap(lambda x: x[1])
kvs.foreachRDD(lambda line: line.foreach(test))
ssc.start()
ssc.awaitTermination()
...