У меня есть поток сообщений ZMQ, мне нужно отфильтровать нужные и сохранить их в MongoDB. Хитрость в том, чтобы иметь возможность установить условия фильтра динамически .
Я придумал это решение, используя multiprocessing
и Flask
. По сути, я использую Flask HTTP API для изменения глобального словаря filter_parameters
, заданного Manager
и используемого совместно с другим процессом, который выполняет сам фильтр zmq. Итак, мой вопрос - это хорошее решение и какова лучшая практика в этом отношении ?
from multiprocessing import Process, Manager
from flask import Flask, jsonify, request
from pymongo import MongoClient
app = Flask(__name__)
manager = Manager()
filter_parameters = manager.dict()
filter_parameters['addresses'] = ['']
def scan(q, filter_parameters):
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_unicode(zmq.SUBSCRIBE, 'tx')
client = MongoClient() # localhost
db = client.transactions_db
tx_collection = db.tx_collection
while True:
recv_str = socket.recv_string()
recv_str = recv_str.split()
filter(recv_str, filter_parameters, tx_collection)
def filter(recv_str, filter_parameters, tx_collection):
if recv_str[0] == 'tx':
if recv_str[7] == '0':
if recv_str[2] in filter_parameters['addresses']:
tx_collection.insert_one({'tx':recv_str}).inserted_id
p = Process(target=scan, args=(filter_parameters))
p.daemon = True
p.start()
@app.route('/zmq_buffer/set_filter_addresses', methods=["POST"])
def set_filter_addresses():
data_jsn = request.json
filter_parameters['addresses'] = data_jsn['addresses']
return jsonify({'addresses':filter_parameters['addresses']}), 200
@app.route('/zmq_buffer/get_filter_addresses', methods=["GET"])
def get_filter_addresses():
if filter_parameters['addresses']:
resp = jsonify(filter_parameters['addresses'])
else:
resp = 'None'
return resp, 200
if __name__ == '__main__':
app.run(port=7050)
p.terminate()