Чтение и удаление из mongodb с помощью многопроцессорной обработки Python - PullRequest
0 голосов
/ 26 октября 2019

У меня есть mongodb с коллекцией incoming и коллекцией target. В настоящее время рабочий процесс выполняет следующее (упрощенно):

def worker(number):
    incomings = db.incoming.find()
    buffersize=5
    readcounter=0
    for incoming in incomings:
        readcounter+=1
        documentToInsert={'keyfield':incoming["keyfield"], +other fields after some treatments...}
        documentsToInsert.append(incoming)
        documentToDelete={'_id':incoming["_id"]}
        documentsToDelete.append(documentToDelete)
        if readcounter >= readbuffer:
            readcounter=0
            db.incoming.remove({'_id': { '$in': [ObjectId(docs["_id"]) for docs in documentsToDelete]}})
            db.target.insert_many([docs for docs in documentsToInsert],ordered=False)

Конечно, операторы remove и insert_many окружены try/except.

Поскольку данные поступают быстрее, чемон обрабатывается рабочим / one, мне нужно стать быстрее, например, вызывая его на всех процессорах, что должно произойти в любом случае, чтобы быть эффективным. Я делаю это с помощью следующего кода:

if __name__== "__main__":
    procs=[]
    number=0
    for cpu in range(multiprocessing.cpu_count()):
        procs.append(multiprocessing.Process(target = worker, args = (number,)))
        number+=1
    for proc in procs:
        proc.start()
    for proc in procs:
        proc.join()
    print("=====================FIN=========================")

Проблема в том, что пока один поток читает документы buffersize, другие потоки получают одинаковые документы, что приводит к дилемме, которую делает только один потокуспешные вставки в target, другие потоки выдают дубликаты исключений ключа. Этот эффект делает только один процесс полезным. Без многопоточности комбо remove / insert_many работает нормально, и я легко могу работать с более высокими размерами буферизации.

Я думал о вставке данных в incoming с дополнительным полем, чтобы квалифицировать работника number, ноэто занимает дополнительное дисковое пространство и израсходует дополнительную обработку, плюс, во время генерации, я не знаю, сколько рабочих будет работать с данными.

Я уже пытался спать случайное время в каждом потоке, но это абсолютно непредсказуемо и не предотвращает ошибки как таковые.

Что я могу сделать, чтобы все потоки обрабатывали разные данные?

Ответы [ 2 ]

1 голос
/ 26 октября 2019

Согласно моему комментарию, я думаю, что брокер сообщений, использующий что-то вроде RabbitMQ, лучше всего подходит для вашего случая использования. С RabbitMQ и аналогичными брокерами сообщений (я не использовал 0mq) вам не нужно подпитывать другие потоки, просто запускайте столько потоков, сколько вам нужно, каждый подписывается, и брокер доставляет сообщения по очереди.

0 голосов
/ 28 октября 2019

Спасибо @ Belly Buster за идею разделения процедур с помощью * MQ. Я решил эту проблему с помощью ZeroMQ, который не содержит брокеров, но в этом случае я реализовал брокер балансировки нагрузки, основанный на примере брокера балансировки нагрузки для ZeroMQ . Клиент читает из базы данных, в то время как рабочие работают с записями, которые они получают ZeroMQ. Я попытался добавить несколько подробных комментариев в код, чтобы прояснить несколько моментов. В коде отсутствуют несколько служебных классов, которые я написал и которые не являются частью этого решения;этот код просто для ответа на вопрос в надежде, что кто-нибудь найдет его полезным.

"""
Original Author: Brandon Carpenter (hashstat) <brandon(dot)carpenter(at)pnnl(dot)gov>
This code was part of the ZeroMQ Tutorial and implements the Load-balancing broker pattern.
Modified by @https://stackoverflow.com/users/2226028/michael
"""

from __future__ import print_function

import multiprocessing
import zmq
import io
import pymongo
from pymongo import MongoClient
import time
from pprint import pprint
import ast
import json
from bson.json_util import dumps
from datetime import datetime
from PairConfig import PairConfig
from PairController import PairController
import ctypes
import sys
from random import randint

NBR_CLIENTS = 1
NBR_WORKERS = 3

# Load the configuration file
# this is a configuration class which is not documented here
pairConfig=PairConfig("verify.ini")

# multiprocessing shared variables setup
manager = multiprocessing.Manager()
insertbuffer=manager.list()
deletebuffer=manager.list()
totalcounter=multiprocessing.Value(ctypes.c_int,0)

def client_task(ident):
    try:
        """Basic request-reply client using REQ socket."""
        client = MongoClient(pairConfig.config.get('db','url'))
        db=client.databasename
        socket = zmq.Context().socket(zmq.REQ)
        socket.identity = u"Client-{}".format(ident).encode("ascii")
        socket.connect("ipc://frontend.ipc")

        while True:
            incomings = db.incoming.find()
            # this makes it safe(r) to run this on different nodes
            incomings.skip(randint(randint(1,500),randint(5000,500000)))
            for incoming in incomings:
                pair = {'primarykey' : incoming["primarykey"], 'value' : incoming["value"]}
                # Send request, get reply
                socket.send_string(b"%s" % pair)
                reply = socket.recv()
    except KeyboardInterrupt:
        print("\nexit client")

def worker_task(ident,insertbuffer,deletebuffer,mylock):
    try:
        """Worker task, using a REQ socket to do load-balancing."""
        socket = zmq.Context().socket(zmq.REQ)
        socket.identity = u"Worker-{}".format(ident).encode("ascii")
        socket.connect("ipc://backend.ipc")

        socket.send(b"READY")

        # this is a helper class which is not documented here
        pairController=PairController(pairConfig)
        while True:
            address, empty, request = socket.recv_multipart()
            with totalcounter.get_lock():
                totalcounter.value+=1
            dictToInsert = ast.literal_eval(request.encode("ascii"))
            dictToInsert["last_checked"]=datetime.now()
            insertbuffer.append(dictToInsert)
            deletebuffer.append(dictToInsert["primarykey"])
            # ... do some timely treatment here - a lot of variable time gets burned here ...
            # result will be result1 and result2, for the sake of simplification I will fill it with random numbers here
            result1=randint(1,10)
            result2=randint(1,10)
            sys.stdout.write("%s %s insertbuffer: %d, deletebuffer: %d, totalcounter: %d, b: %s, r: %s            \r" % (socket.identity.decode("ascii"),dictToInsert["primarykey"],len(insertbuffer),len(deletebuffer),totalcounter.value,result1,result2))
            sys.stdout.flush()
            # readbuffer comes from an ini file ... I chose 500 for now
            if len(insertbuffer[:]) >= int(pairConfig.config.get('verify','readbuffer')) and ident==0:
                mylock.acquire()
                # these 2 methods are inside a class pairController which is not documented here,
                # it's basically one method for insert_many() and one method for remove(), 
                # each time with the respective buffer as a filter
                pairController.storePairs("history",insertbuffer[:])
                pairController.deletePairs("history",deletebuffer[:])
                # this empties the buffers for all filters:
                insertbuffer[:]=[]
                deletebuffer[:]=[]
                mylock.release()
            socket.send_multipart([address, b"", b"ok"])
    except KeyboardInterrupt:
        print("\nexit worker")

def main():
    """Load balancer main loop."""
    # Prepare context and sockets
    context = zmq.Context.instance()
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("ipc://frontend.ipc")
    backend = context.socket(zmq.ROUTER)
    backend.bind("ipc://backend.ipc")

    # Start background tasks
    mylock = multiprocessing.Lock()
    def start(task, *args):
        process = multiprocessing.Process(target=task, args=args)
        process.daemon = True
        process.start()
    for i in range(NBR_CLIENTS):
        start(client_task, i)
    for i in range(NBR_WORKERS):
        start(worker_task, i, insertbuffer, deletebuffer, mylock)

    # Initialize main loop state
    count = NBR_CLIENTS
    workers = []
    poller = zmq.Poller()
    # Only poll for requests from backend until workers are available
    poller.register(backend, zmq.POLLIN)

    while True:
        sockets = dict(poller.poll())

        if backend in sockets:
            # Handle worker activity on the backend
            request = backend.recv_multipart()
            worker, empty, client = request[:3]
            if not workers:
                # Poll for clients now that a worker is available
                poller.register(frontend, zmq.POLLIN)
            workers.append(worker)
            if client != b"READY" and len(request) > 3:
                # If client reply, send rest back to frontend
                empty, reply = request[3:]
                frontend.send_multipart([client, b"", reply])
                count -= 1

        if frontend in sockets:
            # Get next client request, route to last-used worker
            client, empty, request = frontend.recv_multipart()
            worker = workers.pop(0)
            backend.send_multipart([worker, b"", client, b"", request])
            if not workers:
                # Don't poll clients if no workers are available
                poller.unregister(frontend)

    # Clean up
    backend.close()
    frontend.close()
    context.term()

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\nexit main")
...