Я пытаюсь использовать Flask-Socketio для подключения к 3 отдельным очередям RabbitMQ: - 1, которая прослушивает сообщение об обновлении конфигурации, - и 2, которые определены в базе данных. При запуске сервера я подключаюсь к базе данных, получаяконфиги там и начинающие потребители.Затем в веб-интерфейсе, если один из параметров конфигурации изменяется, эти изменения записываются в базу данных, а сообщение об обновлении конфигурации добавляется в первую очередь RabbitMQ.В идеале, это может привести к отключению потребителя Pika, который у меня в данный момент запущен, присоединению этого потока и повторному запуску другого потока с новой информацией о конфигурации.
Все, что я только что изложил, работает,но при первой попытке выключить потребителя я всегда получаю сообщение об ошибке:
There was an error stopping the consumer: Second simultaneous read on fileno x detected.
Unless you really know what you're doing, make sure that only one greenthread can read any particular socket.
Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False)...
Потребители в конечном итоге закрываются и перезапускаются, однако я хотел бы понять, почему это происходит, икак я мог изменить свой код, чтобы остановить его.Место, где всегда происходит ошибка, находится между этими двумя функциями, первая находится в моем классе Consumer, а вторая в моем классе Queue:
def run(self):
while True:
self.go = True
self.message_queue = Queue(self.configs, self.go, self.mongo_config)
self.message_queue.start()
# here I wait for an event which I set when the config is updated
self.event.wait()
self.go = False
setattr(self.message_queue, 'go', False)
new_config = self.refresh_configs()
setattr(self, 'configs', new_config)
# when this is called, it should close the existing connection and join the thread
self.message_queue.refresh_connection()
self.message_queue.join()
def refresh_connection(self):
while True:
if not self.go:
break
self.rmq_connection = rabbitmq_consumer(...)
self.rmq_connection.start_consuming()
self._lock.acquire()
try:
# right here is where the second read error occurs
self.rmq_connection.stop_consuming()
self.rmq_connection.close()
except Exception as e:
print('There was an error stopping the consumer: {0}'.format(e))
self._lock.release()
Ниже приведен гораздо более полный примеркод, на случай, если он поможет пролить свет на проблему.
thread = None
thread_lock = Lock()
event = Event()
class Queue(Thread):
def __init__(self, configs, go, outbound):
Thread.__init__(self)
self._lock = eventlet.semaphore.Semaphore(1)
self.go = go
self.configs = configs
self.outbound = outbound
...
self.rmq_connection = None
def on_rmq_message(self, ...):
...
self._lock.acquire()
socketio.emit('eventEmit', {'data': results}, namespace='/')
result = rabbitmq_producer(...)
self._lock.release()
def refresh_connection(self):
while True:
if not self.go:
break
self.rmq_connection = rabbitmq_consumer(...)
self.rmq_connection.start_consuming()
self._lock.acquire()
try:
self.rmq_connection.stop_consuming()
self.rmq_connection.close()
except Exception as e:
print('There was an error stopping the consumer: {0}'.format(e))
self._lock.release()
def run(self):
self.refresh_connection()
class Consumer(Thread):
def __init__(self, configs, event, channel, mongo_config):
Thread.__init__(self)
self.configs = configs
self.mongo_config = mongo_config
self.event = event
self.message_queue = None
self.go = None
...
def refresh_configs(self):
r = None
mconnection = connect_mongodb(...)
results = retrieve(...)
for result in results.data:
if result.get('channel') == self.channel:
r = result
return r
def run(self):
while True:
self.go = True
self.message_queue = Queue(self.configs, self.go, self.mongo_config)
self.message_queue.start()
self.event.wait()
self.go = False
setattr(self.message_queue, 'go', False)
new_config = self.refresh_configs()
setattr(self, 'configs', new_config)
self.message_queue.refresh_connection()
self.message_queue.join()
class Producer(Thread):
def __init__(self, configs, event):
Thread.__init__(self)
self._lock = eventlet.semaphore.Semaphore(1)
self.configs = configs
self.event = event
...
self.channel = self.configs.get('channel', None)
self.config_consumer = None
def on_config_message(self, ...):
...
self._lock.acquire()
socketio.emit('configEmit', {'data': results}, namespace='/')
self._lock.release()
self.event.set()
self.event.clear()
def run(self):
self.config_consumer = rabbitmq_consumer(...)
self.config_consumer.start_consuming()
def init_connections():
with app.test_request_context('/'):
mconnection = connect_mongodb(...)
results = retrieve(mconnection.engine, mongo_collection, mongo_db, cursor=False)
...
t1 = Producer(configs, event)
for result in results.data:
config = {
...
}
t2 = Consumer(result, event, result['channel'], config)
t2.start()
t1.start()
t1.join()
t2.join()
@socketio.on('connect')
def handle_connection():
global thread
socketio.emit('connectEmit', {'data': 'Connected!'}, namespace='/')
with thread_lock:
if thread is None:
thread = socketio.start_background_task(target=init_connections)
Спасибо за любую помощь, которую вы можете предложить.