Соединение Производителя RabbitMQ блокирует Потребителя в питоне с Комбу и Пикой - PullRequest
0 голосов
/ 11 марта 2019

У меня странная проблема, с которой я боролся уже пару дней.Он появляется только в «производственной» среде.Это настройка:

Приложение Python 3, которое запускает потребителя очереди (только один) в отдельном потоке.Связь установилась в основном процессе.Приложение работает с uwsgi и nginx на образе docker Alpine Linux в кластере Kubernetes.Уссги порождает 5 процессов.(заканчивается 5 потребителями, каждый в потоке основного приложения)

В моем devbox (только один процесс) все работает как положено.

Есть один производитель, который помещает работу в очередькоторый должен получить от одного из потребителей и получить процессы.Теперь происходит следующее:

  1. приложение запущено, потребители ждут работы
  2. я запускаю производителя
  3. я помещаю новую работу в очередь
  4. работа в очереди (проверьте управление rabbitmq)
  5. потребитель ничего не делает
  6. я ухожу от производителя
  7. потребитель ничего не делает
  8. я снова запускаю производителя
  9. потребитель видит новую работу
  10. необходимо остановить и снова запустить производителя, чтобы потребитель закончил работу над работой

Я начал использовать Пика и коммутируемыйКомбу, потому что мне было трудно, это был вопрос библиотеки.Но оба ведут себя одинаково.

Вот несколько упрощенных кодов, как пишутся потребитель и производитель:

consumer.py:

import time
from kombu import Connection, BrokerConnection, Exchange, Queue
from kombu.pools import connections
from kombu.mixins import ConsumerMixin



class QueueListenerKombu(ConsumerMixin, threading.Thread):
    queue = None
    connection = None

    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        super().__init__(group, target, name, args, kwargs, daemon=daemon)
        self.connection = kwargs.get('connection')
        self.queue = kwargs.get('queue')

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=self.queue,
                     accept=['json'],
                     callbacks=[self._do_work],
                     prefetch_count=1)]

    def _do_work(self, job_data, message):
        logger.debug("got work")
        message.ack()


class WorkQueueKombu( object ):
    channel = None
    connection = None
    responsequeue_callback = None
    credentials = None
    _queue_listeners = None

    def __init__(self, server="localhost", username="admin", password=""):
        super(WorkQueueKombu, self).__init__()
        self.credentials = {"host": server, "username": username, "password": password, 'port':5672}
        self._connect()
        self._queue_listeners = {}
        self.parameters = None


    def _connect(self):
        connection_string = 'amqp://{username}:{password}@{host}:{port}//'.format(
        username=self.credentials.get('username'),
        password=self.credentials.get('password'),
        host=self.credentials.get('host'),
        port=self.credentials.get('port'))

        self.connection = BrokerConnection(connection_string)

    def _startNewQueueListener(self, queue) -> bool:
        job_exchange = Exchange('alderaanjobs', type='direct')
        job_queues = [Queue(queue, job_exchange, routing_key=queue)]

        listener = QueueListenerKombu( kwargs={"connection":self.connection, "queue": job_queues})
        self._queue_listeners[queue] = listener
        listener.start()

    def start_receiving_work(self, queue):
        self._startNewQueueListener(queue)

и это publisher.py:

class WorkQueue( object ):
    channel = None
    connection = None

    def __init__(self, server="localhost", username="admin", password=""):
        super(WorkQueue, self).__init__()
        self._connect(server, username, password)
        self._queue_listeners = {}

    def _connect(self, server="localhost", username="admin", password=""):
        connection_string = 'amqp://{username}:{password}@{host}:{port}//'.format(
        username=username,
        password=password,
        host=server,
        port=5672)

        self.connection = BrokerConnection(connection_string)
        self.connection.connect()

    def place_new_work(self, queue, job:dict):
        job_exchange = Exchange('alderaanjobs', type='direct')

        with producers[self.connection].acquire(block=False) as producer:
            producer.publish(job,
                         serializer='json',
                         #compression='bzip2',
                         exchange=job_exchange,
                         declare=[job_exchange],
                         routing_key=queue)
...