Это для паба / суб-модели с использованием Redis и Kombu.Я могу подтвердить ключ / значение, используя Kombu, но после подтверждения записи, запись удаляется.Мне нужна такая опция, как ключ / значение должно быть помечено как прочитанное (возможно, используется опция флага), и после того, как потребитель успешно использует его, необходимо отправить подтверждение для удаления записи.Между тем, после того, как запись помечена как прочитанная, она не должна быть доступна для другого потребителя.
Я могу подтвердить сообщение с помощью комбу, но запись удаляется.Если потребитель не может обработать сообщение или оно выходит из строя, ключ / значение будут потеряны.Поэтому я хочу две опции, например, ключ / значение должны быть помечены как прочитанные, а запись должна быть удалена только после успешного ее использования потребителем.
Для соединения вызывается очередь
from kombu import Connection
class Logger(object):
def __init__(self, connection, queue_name='mail_qu',
serializer='json', compression=None):
self.queue = connection.SimpleQueue(queue_name)
self.serializer = serializer
self.compression = compression
def log(self, message):
self.queue.put({'message': message},serializer=self.serializer,
compression=self.compression)
def process(self):
while(True):
log_message = self.queue.get(block=True, timeout=1)
entry = log_message.payload # deserialized data.
log_message.ack() # remove message from queue
print("callback",aa)
def close(self):
self.queue.close()
Производитель....
from kombu import Connection
from mail_qu import Logger
if __name__ == '__main__':
from contextlib import closing
with Connection('redis://localhost:6379') as conn:
with closing(Logger(conn)) as logger:
logger.log('keyyyy')
потребитель вызван ....
from kombu import Connection
from mail_qu import Logger
import time
if __name__ == '__main__':
from contextlib import closing
with Connection('redis://localhost:6379') as conn:
with closing(Logger(conn)) as logger:
logger.process()
Использование log_message.ack()
Я могу успешно удалить запись после прочтения сообщения.Но я хочу, чтобы опция была такой: сообщение должно быть помечено как прочитанное, после того как оно прочитано, а после того, как оно было успешно использовано потребителем, запись должна быть удалена.