Динамически настраиваемый фильтр ZMQ как конвейер ETL - PullRequest
0 голосов
/ 20 марта 2019

У меня есть поток сообщений ZMQ, мне нужно отфильтровать нужные и сохранить их в MongoDB. Хитрость в том, чтобы иметь возможность установить условия фильтра динамически .

Я придумал это решение, используя multiprocessing и Flask. По сути, я использую Flask HTTP API для изменения глобального словаря filter_parameters, заданного Manager и используемого совместно с другим процессом, который выполняет сам фильтр zmq. Итак, мой вопрос - это хорошее решение и какова лучшая практика в этом отношении ?

from multiprocessing import Process, Manager
from flask import Flask, jsonify, request
from pymongo import MongoClient

app = Flask(__name__)
manager = Manager()
filter_parameters = manager.dict()
filter_parameters['addresses'] = ['']

def scan(q, filter_parameters):
    import zmq
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5556")
    socket.setsockopt_unicode(zmq.SUBSCRIBE, 'tx')
    client = MongoClient() # localhost
    db = client.transactions_db
    tx_collection = db.tx_collection
    while True:
        recv_str = socket.recv_string()
        recv_str = recv_str.split()
        filter(recv_str, filter_parameters, tx_collection)

def filter(recv_str, filter_parameters, tx_collection):
    if recv_str[0] == 'tx':
        if recv_str[7] == '0':
            if recv_str[2] in filter_parameters['addresses']:
                tx_collection.insert_one({'tx':recv_str}).inserted_id

p = Process(target=scan, args=(filter_parameters))
p.daemon = True
p.start()

@app.route('/zmq_buffer/set_filter_addresses', methods=["POST"])
def set_filter_addresses():
    data_jsn = request.json
    filter_parameters['addresses'] = data_jsn['addresses']
    return jsonify({'addresses':filter_parameters['addresses']}), 200

@app.route('/zmq_buffer/get_filter_addresses', methods=["GET"])
def get_filter_addresses():
    if filter_parameters['addresses']:
        resp = jsonify(filter_parameters['addresses'])
    else:
        resp = 'None'
    return resp, 200

if __name__ == '__main__':
    app.run(port=7050)
    p.terminate()
...