Как подтвердить и удалить запись в Redis с помощью Kombu - PullRequest
0 голосов
/ 17 мая 2019

Это для паба / суб-модели с использованием Redis и Kombu.Я могу подтвердить ключ / значение, используя Kombu, но после подтверждения записи, запись удаляется.Мне нужна такая опция, как ключ / значение должно быть помечено как прочитанное (возможно, используется опция флага), и после того, как потребитель успешно использует его, необходимо отправить подтверждение для удаления записи.Между тем, после того, как запись помечена как прочитанная, она не должна быть доступна для другого потребителя.

Я могу подтвердить сообщение с помощью комбу, но запись удаляется.Если потребитель не может обработать сообщение или оно выходит из строя, ключ / значение будут потеряны.Поэтому я хочу две опции, например, ключ / значение должны быть помечены как прочитанные, а запись должна быть удалена только после успешного ее использования потребителем.

Для соединения вызывается очередь

from kombu import Connection

class Logger(object):

    def __init__(self, connection, queue_name='mail_qu',
            serializer='json', compression=None):
        self.queue = connection.SimpleQueue(queue_name)
        self.serializer = serializer
        self.compression = compression

    def log(self, message):
        self.queue.put({'message': message},serializer=self.serializer,
                        compression=self.compression)

    def process(self):
        while(True):
            log_message = self.queue.get(block=True, timeout=1)
            entry = log_message.payload # deserialized data.

            log_message.ack() # remove message from queue

            print("callback",aa)
    def close(self):
        self.queue.close()

Производитель....

from kombu import Connection
from mail_qu import Logger
if __name__ == '__main__':
    from contextlib import closing

    with Connection('redis://localhost:6379') as conn:
        with closing(Logger(conn)) as logger:


            logger.log('keyyyy')

потребитель вызван ....

from kombu import Connection
from mail_qu import Logger
import time
if __name__ == '__main__':
    from contextlib import closing

    with Connection('redis://localhost:6379') as conn:
        with closing(Logger(conn)) as logger:

            logger.process()

Использование log_message.ack() Я могу успешно удалить запись после прочтения сообщения.Но я хочу, чтобы опция была такой: сообщение должно быть помечено как прочитанное, после того как оно прочитано, а после того, как оно было успешно использовано потребителем, запись должна быть удалена.

...