Как использовать структурированную потоковую искру в python для вставки строки в Mongodb с помощью ForeachWriter? - PullRequest
1 голос
/ 05 февраля 2020

Я использую Streaming для чтения данных из kafka и вставки их в mongodb. Я использую pyspark 2.4.4. Я пытаюсь использовать ForeachWriter, потому что использование только для каждого метода означает, что соединение будет устанавливаться для каждой строки.

    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.
        self.connection = MongoClient("192.168.0.100:27017")
        self.db = self.connection['test']
        self.coll = self.db['output']    
        print(epoch_id)
        pass

    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.
        #self.coll=None -> used this to test, if I'm getting an exception if it is there but I'm not getting one
        self.coll.insert_one(row.asDict())
        pass

    def close(self, error):
        # Close the connection. This method in optional in Python.
        print(error)
        pass

df_w=df7\
        .writeStream\
        .foreach(ForeachWriter())\
        .trigger(processingTime='1 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .start()df_w=df7\
        .writeStream\
        .foreach(ForeachWriter())\
        .trigger(processingTime='1 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .start()

Моя проблема не в том, чтобы вставлять в mongodb, и я не могу найти решения для этого. Если закомментировать, я получу ошибку. Но метод процесса не выполняется. У кого-нибудь есть идеи?

1 Ответ

0 голосов
/ 05 февраля 2020

Вы установили коллекцию на None в первой строке функции процесса. Поэтому вы вставляете строку в никуда. Кроме того, я не знаю, здесь ли это, или в вашем коде, но у вас есть запись writeStream два раза.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...