Это заняло некоторое время, и с каждым шагом становилось все более загадочным.
Я довольно эффективно использую Kombu SimpleQueue для отправки сообщений в задачу Celery. Хотя все мои проблемы были решены, чтобы быть честными, потому что это то, что мне нужно, и SimpleQueue предлагает это элегантно. До ...
Чтобы поместить это в контекст, у меня есть сайт Django и задача Celery, все прекрасно работают вместе.
Я могу поместить сообщение в очередь и прочитать его следующим образом:
from time import sleep
from celery import current_app
from celery.utils.log import get_logger
from kombu.simple import SimpleQueue
logger = get_logger(__name__)
def q_name(task_id):
return f'MySimpleQueueFor_{task_id}'
def send_to_task(task_id, message):
with Connection(current_app.conf.broker_write_url) as conn:
q = conn.SimpleQueue(q_name(task_id))
q.put(message)
print(f'Sent: {message} to {q_name(task_id)}')
q.close()
def check_for_message(q, task_id):
try:
message = q.get_nowait() # Raises an Empty exception if the Queue is empty
payload = message.payload
message.ack() # remove message from queue
q.close()
return payload
except q.Empty:
return None
@app.task(bind=True)
def my_task(self, *args, **kwargs):
with Connection(current_app.conf.broker_read_url) as conn:
task_id = self.request.id
q = conn.SimpleQueue(q_name(task_id))
while True:
message = check_for_message(q, task_id)
logger.info(f'Received: {message} from {q.name}')
sleep(0.1)
Имея идентификатор_задачи в руке, я могу отправить любое сообщение в задачу I и увидеть записанные сообщения по заданию. Это просто потрясающе. Любить это.
Но теперь, если я пытаюсь выполнить блокировку, то есть задача дождаться сообщения с небольшим изменением:
from time import sleep
from celery import current_app
from celery.utils.log import get_logger
from kombu.simple import SimpleQueue
logger = get_logger(__name__)
def q_name(task_id):
return f'MySimpleQueueFor_{task_id}'
def send_to_task(task_id, message):
with Connection(current_app.conf.broker_write_url) as conn:
q = conn.SimpleQueue(q_name(task_id))
print(f'DEBUG "{q.queue.as_dict()=}"')
q.put(message)
print(f'Sent: {message} to {q_name(task_id)}')
q.close()
def wait_for_message(q, task_id):
logger.info(f'DEBUG "{q.queue.as_dict()=}"')
message = q.get()
payload = message.payload
message.ack()
q.close()
return payload
@app.task(bind=True)
def my_task(self, *args, **kwargs):
with Connection(current_app.conf.broker_read_url) as conn:
task_id = self.request.id
q = conn.SimpleQueue(q_name(task_id))
while True:
message = wait_for_message(q, task_id)
logger.info(f'Received: {message} from {q.name}')
sleep(0.1)
Получено первое отправленное мной сообщение. Второе нет. Звонок на SimpleQueue.get()
никогда не возвращается, он ждет бесконечно. Я могу продолжать отправлять сообщения, они никогда не завершаются.
Если я повторно реализую SimpleQueue.get()
в терминах SimpleQueue.get_nowait()
(используя check_for_message()
выше) следующим образом:
def wait_for_message(q, task_id):
logger.info(f'DEBUG "{q.queue.as_dict()=}"')
while message is None:
message = check_for_message(q, task_id)
return message
Тогда задача блокируется на wait_for_message()
отлично, и получает каждое сообщение, которое я посылаю.
Я сбросил конфиги очереди, чтобы проверить любые странные настройки, но приведенные выше дампы q.as_dict()
идентичны на конце отправителя и получателя и показывают нечетные настройки:
"q.queue.as_dict()={
'name': 'MySimpleQueueFor_685a78db-51c1-4c40-b609-f10eed02dd7d',
'exchange': <Exchange MySimpleQueueFor_685a78db-51c1-4c40-b609-f10eed02dd7d(direct) bound to chan:1>,
'routing_key': 'MySimpleQueueFor_685a78db-51c1-4c40-b609-f10eed02dd7d',
'queue_arguments': {},
'binding_arguments': None,
'consumer_arguments': None,
'durable': True,
'exclusive': False,
'auto_delete': False,
'no_ack': False,
'alias': None,
'bindings': [],
'no_declare': None,
'expires': None,
'message_ttl': None,
'max_length': None,
'max_length_bytes': None,
'max_priority': None
}"
Когда я Посмотрите на реализации SimpleQueue. Я вижу, что их реализация получения блокировки более сложна и потребует некоторого размышления и диагностики, прежде чем я смогу понять, какая часть этого ломается.
От: https://docs.celeryproject.org/projects/kombu/en/stable/_modules/queue.html
def get(self, block=True, timeout=None):
if not block:
return self.get_nowait()
self._consume()
time_start = monotonic()
remaining = timeout
while True:
if self.buffer:
return self.buffer.popleft()
if remaining is not None and remaining <= 0.0:
raise self.Empty()
try:
# The `drain_events` method will
# block on the socket connection to rabbitmq. if any
# application-level messages are received, it will put them
# into `self.buffer`.
# * The method will block for UP TO `timeout` milliseconds.
# * The method may raise a socket.timeout exception; or...
# * The method may return without having put anything on
# `self.buffer`. This is because internal heartbeat
# messages are sent over the same socket; also POSIX makes
# no guarantees against socket calls returning early.
self.channel.connection.client.drain_events(timeout=remaining)
except socket.timeout:
raise self.Empty()
if remaining is not None:
elapsed = monotonic() - time_start
remaining = timeout - elapsed
def get_nowait(self):
m = self.queue.get(no_ack=self.no_ack)
if not m:
raise self.Empty()
return m
Основное различие заключается в том, что они опираются на self.channel.connection.client.drain_events()
, который блокирует, и, таким образом, похоже, что проблема заключается именно в этом. Тайна становится глубже, и мне интересно, есть ли у кого-то похожий опыт или опыт с Комбу, который может пролить свет на то, почему self.channel.connection.client.drain_events()
никогда не возвращается, несмотря на то, что сообщения находятся в очереди! Я, конечно, буду углубляться во времени, но, как обычно, после этого я отправляю вопрос в надежде получить ответ, который сэкономит мне время, когда я в следующий раз сижу, чтобы провести более глубокий диагноз (углубление в сток-события, если мне придется ).