Flask SocketIO Eventlet Вторая ошибка одновременного чтения - PullRequest
0 голосов
/ 24 мая 2018

Я пытаюсь использовать Flask-Socketio для подключения к 3 отдельным очередям RabbitMQ: - 1, которая прослушивает сообщение об обновлении конфигурации, - и 2, которые определены в базе данных. При запуске сервера я подключаюсь к базе данных, получаяконфиги там и начинающие потребители.Затем в веб-интерфейсе, если один из параметров конфигурации изменяется, эти изменения записываются в базу данных, а сообщение об обновлении конфигурации добавляется в первую очередь RabbitMQ.В идеале, это может привести к отключению потребителя Pika, который у меня в данный момент запущен, присоединению этого потока и повторному запуску другого потока с новой информацией о конфигурации.

Все, что я только что изложил, работает,но при первой попытке выключить потребителя я всегда получаю сообщение об ошибке:

There was an error stopping the consumer: Second simultaneous read on fileno x detected.  
Unless you really know what you're doing, make sure that only one greenthread can read any particular socket.
Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False)...

Потребители в конечном итоге закрываются и перезапускаются, однако я хотел бы понять, почему это происходит, икак я мог изменить свой код, чтобы остановить его.Место, где всегда происходит ошибка, находится между этими двумя функциями, первая находится в моем классе Consumer, а вторая в моем классе Queue:

def run(self):        
     while True:
        self.go = True
        self.message_queue = Queue(self.configs, self.go, self.mongo_config)
        self.message_queue.start()
        # here I wait for an event which I set when the config is updated
        self.event.wait() 
        self.go = False
        setattr(self.message_queue, 'go', False)
        new_config = self.refresh_configs()
        setattr(self, 'configs', new_config)
        # when this is called, it should close the existing connection and join the thread
        self.message_queue.refresh_connection()
        self.message_queue.join()

def refresh_connection(self):
    while True:
        if not self.go:
            break
        self.rmq_connection = rabbitmq_consumer(...)
        self.rmq_connection.start_consuming()
    self._lock.acquire()
    try:
        # right here is where the second read error occurs
        self.rmq_connection.stop_consuming()
        self.rmq_connection.close()
    except Exception as e:
        print('There was an error stopping the consumer: {0}'.format(e))
    self._lock.release()

Ниже приведен гораздо более полный примеркод, на случай, если он поможет пролить свет на проблему.

thread = None
thread_lock = Lock()
event = Event()

class Queue(Thread):
    def __init__(self, configs, go, outbound):
        Thread.__init__(self)
        self._lock = eventlet.semaphore.Semaphore(1)
        self.go = go
        self.configs = configs
        self.outbound = outbound
        ...
        self.rmq_connection = None

    def on_rmq_message(self, ...):
        ...
        self._lock.acquire()
        socketio.emit('eventEmit', {'data': results}, namespace='/')
        result = rabbitmq_producer(...)
        self._lock.release()

    def refresh_connection(self):
        while True:
            if not self.go:
                break
            self.rmq_connection = rabbitmq_consumer(...)
            self.rmq_connection.start_consuming()
        self._lock.acquire()
        try:
            self.rmq_connection.stop_consuming()
            self.rmq_connection.close()
        except Exception as e:
            print('There was an error stopping the consumer: {0}'.format(e))
        self._lock.release()

    def run(self):
        self.refresh_connection()


class Consumer(Thread):
    def __init__(self, configs, event, channel, mongo_config):
        Thread.__init__(self)
        self.configs = configs
        self.mongo_config = mongo_config
        self.event = event
        self.message_queue = None
        self.go = None
        ...

    def refresh_configs(self):
        r = None
        mconnection = connect_mongodb(...)
        results = retrieve(...)
        for result in results.data:
            if result.get('channel') == self.channel:
                r = result
        return r

    def run(self):
        while True:
            self.go = True
            self.message_queue = Queue(self.configs, self.go, self.mongo_config)
            self.message_queue.start()
            self.event.wait()
            self.go = False
            setattr(self.message_queue, 'go', False)
            new_config = self.refresh_configs()
            setattr(self, 'configs', new_config)
            self.message_queue.refresh_connection()
            self.message_queue.join()


class Producer(Thread):
    def __init__(self, configs, event):
        Thread.__init__(self)
        self._lock = eventlet.semaphore.Semaphore(1)
        self.configs = configs
        self.event = event
        ...
        self.channel = self.configs.get('channel', None)
        self.config_consumer = None

    def on_config_message(self, ...):
        ...
        self._lock.acquire()
        socketio.emit('configEmit', {'data': results}, namespace='/')
        self._lock.release()
        self.event.set()
        self.event.clear()

    def run(self):
        self.config_consumer = rabbitmq_consumer(...)
        self.config_consumer.start_consuming()


def init_connections():
    with app.test_request_context('/'):
        mconnection = connect_mongodb(...)

        results = retrieve(mconnection.engine, mongo_collection, mongo_db, cursor=False)
        ...

        t1 = Producer(configs, event)
        for result in results.data:
            config = {
                ...
            }
            t2 = Consumer(result, event, result['channel'], config)
            t2.start()
        t1.start()
        t1.join()
        t2.join()


@socketio.on('connect')
def handle_connection():
    global thread
    socketio.emit('connectEmit', {'data': 'Connected!'}, namespace='/')
    with thread_lock:
        if thread is None:
            thread = socketio.start_background_task(target=init_connections)

Спасибо за любую помощь, которую вы можете предложить.

...