Я использую алгоритм для пометки поля монго и на основании этого я добавляю новое поле в этот документ. Поскольку количество моих коллекций составляет около 1 миллиона, обновление и вставка занимают так много времени.
Пример данных:
{id:'a1',content:'some text1'}
{id:'a2',content:'some text2'}
код питона:
docs= db.col.find({})
for doc in docs:
out = do_operation(doc['content']) //do_operation is my algorithm
doc["tag"]=out
db.col.update(id:doc['id'],$set:{'Tag_flag':TRUE})
db.col2.insert(doc)
Принимая во внимание, что для увеличения скорости я использовал кадры данных spark, но кадры данных spark занимают много памяти и выдают ошибку памяти.
(конфигурация: 4 ядра и 16 ГБ ОЗУ на одном кластере Hadoop)
df = //loading mongodata to a dataframe
df1 = df.withColumn('tag',df.content)
output = []
for doc in df.rdd.collect():
out = do_operation(doc['content'])
output.append(out)
df2 = spark.createDataFrame(output)
final_df = df1.join(df2, df1._id == df2._id , 'inner')
//and finally inserting this dataframe into new collection.
Мне нужно оптимизировать мой искровой код, чтобы ускорить работу с меньшим объемом памяти.
Могу ли я использовать любой брокер сообщений, такой как Kafka, RabbitMQ или Reddis, между mongo и spark.
Будет ли это полезно?