Я использую Streaming для чтения данных из kafka и вставки их в mongodb. Я использую pyspark 2.4.4. Я пытаюсь использовать ForeachWriter, потому что использование только для каждого метода означает, что соединение будет устанавливаться для каждой строки.
def open(self, partition_id, epoch_id):
# Open connection. This method is optional in Python.
self.connection = MongoClient("192.168.0.100:27017")
self.db = self.connection['test']
self.coll = self.db['output']
print(epoch_id)
pass
def process(self, row):
# Write row to connection. This method is NOT optional in Python.
#self.coll=None -> used this to test, if I'm getting an exception if it is there but I'm not getting one
self.coll.insert_one(row.asDict())
pass
def close(self, error):
# Close the connection. This method in optional in Python.
print(error)
pass
df_w=df7\
.writeStream\
.foreach(ForeachWriter())\
.trigger(processingTime='1 seconds') \
.outputMode("update") \
.option("truncate", "false")\
.start()df_w=df7\
.writeStream\
.foreach(ForeachWriter())\
.trigger(processingTime='1 seconds') \
.outputMode("update") \
.option("truncate", "false")\
.start()
Моя проблема не в том, чтобы вставлять в mongodb, и я не могу найти решения для этого. Если закомментировать, я получу ошибку. Но метод процесса не выполняется. У кого-нибудь есть идеи?