Celery / Kombu SimpleQueue.get () никогда не возвращается, даже когда сообщение появляется в очереди. Почему? - PullRequest
0 голосов
/ 09 марта 2020

Это заняло некоторое время, и с каждым шагом становилось все более загадочным.

Я довольно эффективно использую Kombu SimpleQueue для отправки сообщений в задачу Celery. Хотя все мои проблемы были решены, чтобы быть честными, потому что это то, что мне нужно, и SimpleQueue предлагает это элегантно. До ...

Чтобы поместить это в контекст, у меня есть сайт Django и задача Celery, все прекрасно работают вместе.

Я могу поместить сообщение в очередь и прочитать его следующим образом:

from time import sleep
from celery import current_app
from celery.utils.log import get_logger
from kombu.simple import SimpleQueue

logger = get_logger(__name__)

def q_name(task_id):
    return f'MySimpleQueueFor_{task_id}'

def send_to_task(task_id, message):
    with Connection(current_app.conf.broker_write_url) as conn:
        q = conn.SimpleQueue(q_name(task_id))
        q.put(message)
        print(f'Sent: {message} to {q_name(task_id)}')
        q.close()

def check_for_message(q, task_id):
    try:
        message = q.get_nowait()            # Raises an Empty exception if the Queue is empty
        payload = message.payload
        message.ack()                       # remove message from queue
        q.close()
        return payload

    except q.Empty:
        return None

@app.task(bind=True)
def my_task(self, *args, **kwargs):
    with Connection(current_app.conf.broker_read_url) as conn:
        task_id = self.request.id
        q = conn.SimpleQueue(q_name(task_id))
        while True:
            message = check_for_message(q, task_id)
            logger.info(f'Received: {message} from {q.name}')
            sleep(0.1) 

Имея идентификатор_задачи в руке, я могу отправить любое сообщение в задачу I и увидеть записанные сообщения по заданию. Это просто потрясающе. Любить это.

Но теперь, если я пытаюсь выполнить блокировку, то есть задача дождаться сообщения с небольшим изменением:

from time import sleep
from celery import current_app
from celery.utils.log import get_logger
from kombu.simple import SimpleQueue

logger = get_logger(__name__)

def q_name(task_id):
    return f'MySimpleQueueFor_{task_id}'

def send_to_task(task_id, message):
    with Connection(current_app.conf.broker_write_url) as conn:
        q = conn.SimpleQueue(q_name(task_id))
        print(f'DEBUG "{q.queue.as_dict()=}"')
        q.put(message)
        print(f'Sent: {message} to {q_name(task_id)}')
        q.close()

def wait_for_message(q, task_id):
    logger.info(f'DEBUG "{q.queue.as_dict()=}"')
    message = q.get()
    payload = message.payload
    message.ack()
    q.close()
    return payload

@app.task(bind=True)
def my_task(self, *args, **kwargs):
    with Connection(current_app.conf.broker_read_url) as conn:
        task_id = self.request.id
        q = conn.SimpleQueue(q_name(task_id))
        while True:
            message = wait_for_message(q, task_id)
            logger.info(f'Received: {message} from {q.name}')
            sleep(0.1) 

Получено первое отправленное мной сообщение. Второе нет. Звонок на SimpleQueue.get() никогда не возвращается, он ждет бесконечно. Я могу продолжать отправлять сообщения, они никогда не завершаются.

Если я повторно реализую SimpleQueue.get() в терминах SimpleQueue.get_nowait() (используя check_for_message() выше) следующим образом:

def wait_for_message(q, task_id):
    logger.info(f'DEBUG "{q.queue.as_dict()=}"')
    while message is None:
        message = check_for_message(q, task_id)
    return message

Тогда задача блокируется на wait_for_message() отлично, и получает каждое сообщение, которое я посылаю.

Я сбросил конфиги очереди, чтобы проверить любые странные настройки, но приведенные выше дампы q.as_dict() идентичны на конце отправителя и получателя и показывают нечетные настройки:

"q.queue.as_dict()={
    'name': 'MySimpleQueueFor_685a78db-51c1-4c40-b609-f10eed02dd7d', 
    'exchange': <Exchange MySimpleQueueFor_685a78db-51c1-4c40-b609-f10eed02dd7d(direct) bound to chan:1>, 
    'routing_key': 'MySimpleQueueFor_685a78db-51c1-4c40-b609-f10eed02dd7d', 
    'queue_arguments': {}, 
    'binding_arguments': None, 
    'consumer_arguments': None, 
    'durable': True, 
    'exclusive': False, 
    'auto_delete': False, 
    'no_ack': False, 
    'alias': None, 
    'bindings': [], 
    'no_declare': None, 
    'expires': None, 
    'message_ttl': None, 
    'max_length': None, 
    'max_length_bytes': None, 
    'max_priority': None
}"

Когда я Посмотрите на реализации SimpleQueue. Я вижу, что их реализация получения блокировки более сложна и потребует некоторого размышления и диагностики, прежде чем я смогу понять, какая часть этого ломается.

От: https://docs.celeryproject.org/projects/kombu/en/stable/_modules/queue.html

    def get(self, block=True, timeout=None):
        if not block:
            return self.get_nowait()

        self._consume()

        time_start = monotonic()
        remaining = timeout
        while True:
            if self.buffer:
                return self.buffer.popleft()

            if remaining is not None and remaining <= 0.0:
                raise self.Empty()

            try:
                # The `drain_events` method will
                # block on the socket connection to rabbitmq. if any
                # application-level messages are received, it will put them
                # into `self.buffer`.
                # * The method will block for UP TO `timeout` milliseconds.
                # * The method may raise a socket.timeout exception; or...
                # * The method may return without having put anything on
                #    `self.buffer`.  This is because internal heartbeat
                #    messages are sent over the same socket; also POSIX makes
                #    no guarantees against socket calls returning early.
                self.channel.connection.client.drain_events(timeout=remaining)
            except socket.timeout:
                raise self.Empty()

            if remaining is not None:
                elapsed = monotonic() - time_start
                remaining = timeout - elapsed

    def get_nowait(self):
        m = self.queue.get(no_ack=self.no_ack)
        if not m:
            raise self.Empty()
        return m

Основное различие заключается в том, что они опираются на self.channel.connection.client.drain_events(), который блокирует, и, таким образом, похоже, что проблема заключается именно в этом. Тайна становится глубже, и мне интересно, есть ли у кого-то похожий опыт или опыт с Комбу, который может пролить свет на то, почему self.channel.connection.client.drain_events() никогда не возвращается, несмотря на то, что сообщения находятся в очереди! Я, конечно, буду углубляться во времени, но, как обычно, после этого я отправляю вопрос в надежде получить ответ, который сэкономит мне время, когда я в следующий раз сижу, чтобы провести более глубокий диагноз (углубление в сток-события, если мне придется ).

...