У меня странная проблема, с которой я боролся уже пару дней.Он появляется только в «производственной» среде.Это настройка:
Приложение Python 3, которое запускает потребителя очереди (только один) в отдельном потоке.Связь установилась в основном процессе.Приложение работает с uwsgi и nginx на образе docker Alpine Linux в кластере Kubernetes.Уссги порождает 5 процессов.(заканчивается 5 потребителями, каждый в потоке основного приложения)
В моем devbox (только один процесс) все работает как положено.
Есть один производитель, который помещает работу в очередькоторый должен получить от одного из потребителей и получить процессы.Теперь происходит следующее:
- приложение запущено, потребители ждут работы
- я запускаю производителя
- я помещаю новую работу в очередь
- работа в очереди (проверьте управление rabbitmq)
- потребитель ничего не делает
- я ухожу от производителя
- потребитель ничего не делает
- я снова запускаю производителя
- потребитель видит новую работу
- необходимо остановить и снова запустить производителя, чтобы потребитель закончил работу над работой
Я начал использовать Пика и коммутируемыйКомбу, потому что мне было трудно, это был вопрос библиотеки.Но оба ведут себя одинаково.
Вот несколько упрощенных кодов, как пишутся потребитель и производитель:
consumer.py:
import time
from kombu import Connection, BrokerConnection, Exchange, Queue
from kombu.pools import connections
from kombu.mixins import ConsumerMixin
class QueueListenerKombu(ConsumerMixin, threading.Thread):
queue = None
connection = None
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
super().__init__(group, target, name, args, kwargs, daemon=daemon)
self.connection = kwargs.get('connection')
self.queue = kwargs.get('queue')
def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.queue,
accept=['json'],
callbacks=[self._do_work],
prefetch_count=1)]
def _do_work(self, job_data, message):
logger.debug("got work")
message.ack()
class WorkQueueKombu( object ):
channel = None
connection = None
responsequeue_callback = None
credentials = None
_queue_listeners = None
def __init__(self, server="localhost", username="admin", password=""):
super(WorkQueueKombu, self).__init__()
self.credentials = {"host": server, "username": username, "password": password, 'port':5672}
self._connect()
self._queue_listeners = {}
self.parameters = None
def _connect(self):
connection_string = 'amqp://{username}:{password}@{host}:{port}//'.format(
username=self.credentials.get('username'),
password=self.credentials.get('password'),
host=self.credentials.get('host'),
port=self.credentials.get('port'))
self.connection = BrokerConnection(connection_string)
def _startNewQueueListener(self, queue) -> bool:
job_exchange = Exchange('alderaanjobs', type='direct')
job_queues = [Queue(queue, job_exchange, routing_key=queue)]
listener = QueueListenerKombu( kwargs={"connection":self.connection, "queue": job_queues})
self._queue_listeners[queue] = listener
listener.start()
def start_receiving_work(self, queue):
self._startNewQueueListener(queue)
и это publisher.py:
class WorkQueue( object ):
channel = None
connection = None
def __init__(self, server="localhost", username="admin", password=""):
super(WorkQueue, self).__init__()
self._connect(server, username, password)
self._queue_listeners = {}
def _connect(self, server="localhost", username="admin", password=""):
connection_string = 'amqp://{username}:{password}@{host}:{port}//'.format(
username=username,
password=password,
host=server,
port=5672)
self.connection = BrokerConnection(connection_string)
self.connection.connect()
def place_new_work(self, queue, job:dict):
job_exchange = Exchange('alderaanjobs', type='direct')
with producers[self.connection].acquire(block=False) as producer:
producer.publish(job,
serializer='json',
#compression='bzip2',
exchange=job_exchange,
declare=[job_exchange],
routing_key=queue)