У меня есть приложение Django, которое должно постоянно прослушивать сообщения от Kafka и затем отправлять их через WebSocket клиенту. Проблема в том, как настроить постоянный слушатель. Для будущей масштабируемости мы решили задействовать Celery в проекте для решения этих проблем с масштабированием.
Моя задача на самом деле выглядит следующим образом:
class ConsumerTask(Task):
name = 'consume_messages'
def run(self, *args, **kwargs):
consumer = get_kafka_consumer(settings.KAFKA_URL,
settings.FAULT_MESSAGES_KAFKA_TOPIC,
'consumer_messages_group')
logger.info("Kafka's consumer has been started")
while True:
messages = consumer.poll()
for _, messages in messages.items():
messages, messages_count = self.get_message(messages)
if messages_count > 0:
messages = save_to_db()
send_via_websocket_messages(messages)
Он правильно сохраняет и отправляет сообщения через WS, но проблемы возникают из бесконечного l oop в задании. По какой-то причине (возможно, из-за ограничения времени ожидания задачи) задание появляется в очереди и никогда не запускается снова. Я не уверен, что демонизирующие работники сельдерея решат эту проблему. Не могли бы вы предоставить некоторые стратегии, которые теперь организуют «Постоянно работающую часть» этого процесса?