Спасибо @ Belly Buster за идею разделения процедур с помощью * MQ. Я решил эту проблему с помощью ZeroMQ, который не содержит брокеров, но в этом случае я реализовал брокер балансировки нагрузки, основанный на примере брокера балансировки нагрузки для ZeroMQ . Клиент читает из базы данных, в то время как рабочие работают с записями, которые они получают ZeroMQ. Я попытался добавить несколько подробных комментариев в код, чтобы прояснить несколько моментов. В коде отсутствуют несколько служебных классов, которые я написал и которые не являются частью этого решения;этот код просто для ответа на вопрос в надежде, что кто-нибудь найдет его полезным.
"""
Original Author: Brandon Carpenter (hashstat) <brandon(dot)carpenter(at)pnnl(dot)gov>
This code was part of the ZeroMQ Tutorial and implements the Load-balancing broker pattern.
Modified by @https://stackoverflow.com/users/2226028/michael
"""
from __future__ import print_function
import multiprocessing
import zmq
import io
import pymongo
from pymongo import MongoClient
import time
from pprint import pprint
import ast
import json
from bson.json_util import dumps
from datetime import datetime
from PairConfig import PairConfig
from PairController import PairController
import ctypes
import sys
from random import randint
NBR_CLIENTS = 1
NBR_WORKERS = 3
# Load the configuration file
# this is a configuration class which is not documented here
pairConfig=PairConfig("verify.ini")
# multiprocessing shared variables setup
manager = multiprocessing.Manager()
insertbuffer=manager.list()
deletebuffer=manager.list()
totalcounter=multiprocessing.Value(ctypes.c_int,0)
def client_task(ident):
try:
"""Basic request-reply client using REQ socket."""
client = MongoClient(pairConfig.config.get('db','url'))
db=client.databasename
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"Client-{}".format(ident).encode("ascii")
socket.connect("ipc://frontend.ipc")
while True:
incomings = db.incoming.find()
# this makes it safe(r) to run this on different nodes
incomings.skip(randint(randint(1,500),randint(5000,500000)))
for incoming in incomings:
pair = {'primarykey' : incoming["primarykey"], 'value' : incoming["value"]}
# Send request, get reply
socket.send_string(b"%s" % pair)
reply = socket.recv()
except KeyboardInterrupt:
print("\nexit client")
def worker_task(ident,insertbuffer,deletebuffer,mylock):
try:
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"Worker-{}".format(ident).encode("ascii")
socket.connect("ipc://backend.ipc")
socket.send(b"READY")
# this is a helper class which is not documented here
pairController=PairController(pairConfig)
while True:
address, empty, request = socket.recv_multipart()
with totalcounter.get_lock():
totalcounter.value+=1
dictToInsert = ast.literal_eval(request.encode("ascii"))
dictToInsert["last_checked"]=datetime.now()
insertbuffer.append(dictToInsert)
deletebuffer.append(dictToInsert["primarykey"])
# ... do some timely treatment here - a lot of variable time gets burned here ...
# result will be result1 and result2, for the sake of simplification I will fill it with random numbers here
result1=randint(1,10)
result2=randint(1,10)
sys.stdout.write("%s %s insertbuffer: %d, deletebuffer: %d, totalcounter: %d, b: %s, r: %s \r" % (socket.identity.decode("ascii"),dictToInsert["primarykey"],len(insertbuffer),len(deletebuffer),totalcounter.value,result1,result2))
sys.stdout.flush()
# readbuffer comes from an ini file ... I chose 500 for now
if len(insertbuffer[:]) >= int(pairConfig.config.get('verify','readbuffer')) and ident==0:
mylock.acquire()
# these 2 methods are inside a class pairController which is not documented here,
# it's basically one method for insert_many() and one method for remove(),
# each time with the respective buffer as a filter
pairController.storePairs("history",insertbuffer[:])
pairController.deletePairs("history",deletebuffer[:])
# this empties the buffers for all filters:
insertbuffer[:]=[]
deletebuffer[:]=[]
mylock.release()
socket.send_multipart([address, b"", b"ok"])
except KeyboardInterrupt:
print("\nexit worker")
def main():
"""Load balancer main loop."""
# Prepare context and sockets
context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind("ipc://frontend.ipc")
backend = context.socket(zmq.ROUTER)
backend.bind("ipc://backend.ipc")
# Start background tasks
mylock = multiprocessing.Lock()
def start(task, *args):
process = multiprocessing.Process(target=task, args=args)
process.daemon = True
process.start()
for i in range(NBR_CLIENTS):
start(client_task, i)
for i in range(NBR_WORKERS):
start(worker_task, i, insertbuffer, deletebuffer, mylock)
# Initialize main loop state
count = NBR_CLIENTS
workers = []
poller = zmq.Poller()
# Only poll for requests from backend until workers are available
poller.register(backend, zmq.POLLIN)
while True:
sockets = dict(poller.poll())
if backend in sockets:
# Handle worker activity on the backend
request = backend.recv_multipart()
worker, empty, client = request[:3]
if not workers:
# Poll for clients now that a worker is available
poller.register(frontend, zmq.POLLIN)
workers.append(worker)
if client != b"READY" and len(request) > 3:
# If client reply, send rest back to frontend
empty, reply = request[3:]
frontend.send_multipart([client, b"", reply])
count -= 1
if frontend in sockets:
# Get next client request, route to last-used worker
client, empty, request = frontend.recv_multipart()
worker = workers.pop(0)
backend.send_multipart([worker, b"", client, b"", request])
if not workers:
# Don't poll clients if no workers are available
poller.unregister(frontend)
# Clean up
backend.close()
frontend.close()
context.term()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nexit main")