Словарь + структура данных очереди с активным удалением старых сообщений - PullRequest
1 голос
/ 04 мая 2011

Я хотел бы создать структуру данных, которая представляет собой набор очередей (в идеале - поиск по хэшу, карте или по типу поиска), где сообщения в очередях активно удаляются после достижения определенного возраста.Значение ttl будет глобальным;сообщения не должны были бы иметь индивидуальные ttl.Разрешение для ttl не должно быть очень точным - только в течение секунды или около того.

Я даже не уверен, что искать здесь.Я мог бы создать отдельную глобальную очередь, которую фоновый поток отслеживает, просматривает и вытягивает указатели на сообщения из глобальной очереди, которые сообщают ему об удалении элементов из отдельных очередей, но поведение должно идти обоими способами.Если элемент удаляется из отдельной очереди, его необходимо удалить из глобальной очереди.

Я бы хотел, чтобы эта структура данных была реализована в Python, в идеале, и, как всегда, скорость имеет первостепенное значение.(больше, чем использование памяти).Любые предложения, с чего начать?

Ответы [ 2 ]

2 голосов
/ 05 мая 2011

Я бы начал с простого моделирования поведения, которое вы ищете в одном классе, выраженного максимально просто. Производительность может быть достигнута позже с помощью итеративной оптимизации, но только при необходимости (она может вам не понадобиться).

Класс ниже делает что-то примерно то, что вы описываете. Очереди - это просто списки, которые названы и хранятся в словаре. Каждое сообщение имеет метку времени и вставляется в начало списка (FIFO). Сообщения получают, проверяя временную метку сообщения в конце списка и выталкивая его до тех пор, пока оно не достигнет сообщения, которое находится ниже порогового значения возраста.

Если вы планируете получить к нему доступ из нескольких потоков, вам потребуется добавить несколько мелкозернистых блокировок, чтобы выжать из них максимальную производительность. Например, метод reap() должен блокировать только 1 очередь за раз, а не блокировать все очереди (синхронизация на уровне метода), поэтому вам также необходимо сохранить блокировку для каждой именованной очереди.

Обновлено - теперь используется глобальный набор сегментов (с отметкой времени, с разрешением 1 секунда), чтобы отслеживать, в каких очередях есть сообщения с того времени. Это уменьшает количество очередей, проверяемых на каждом проходе.

import time
from collections import defaultdict

class QueueMap(object):

    def __init__(self):
        self._expire = defaultdict(lambda *n: defaultdict(int))
        self._store = defaultdict(list)
        self._oldest_key = int(time.time())

    def get_queue(self, name):
        return self._store.get(name, [])

    def pop(self, name):
        queue = self.get_queue(name)
        if queue:
            key, msg = queue.pop()
            self._expire[key][name] -= 1
            return msg
        return None

    def set(self, name, message):
        key = int(time.time())
        # increment count of messages in this bucket/queue
        self._expire[key][name] += 1
        self._store[name].insert(0, (key, message))

    def reap(self, age):
        now = time.time()
        threshold = int(now - age)
        oldest = self._oldest_key

        # iterate over buckets we need to check
        for key in range(oldest, threshold + 1):
            # for each queue with items, expire the oldest ones
            for name, count in self._expire[key].iteritems():
                if count <= 0:
                    continue

                queue = self.get_queue(name)
                while queue:
                    if queue[-1][0] > threshold:
                        break
                    queue.pop()
            del self._expire[key]

        # set oldest_key for next pass
        self._oldest_key = threshold

Использование:

qm = QueueMap()
qm.set('one', 'message 1')
qm.set('one', 'message 2')
qm.set('two', 'message 3')
print qm.pop('one')
print qm.get_queue('one')
print qm.get_queue('two')

# call this on a background thread which sleeps
time.sleep(2)
# reap messages older than 1 second
qm.reap(1)
# queues should be empty now
print qm.get_queue('one')
print qm.get_queue('two')
0 голосов
/ 05 мая 2011

Подумайте о проверке TTL всякий раз, когда вы получаете доступ к очередям, вместо того, чтобы использовать поток для постоянной проверки. Я не уверен, что вы имеете в виду по поводу хэша / карты / dict (что является ключом?), Но как насчет чего-то вроде этого:

import time
class EmptyException(Exception): pass
class TTLQueue(object):
    TTL = 60 # seconds
    def __init__(self):
        self._queue = []

    def push(self, msg):
        self._queue.append((time.time()+self.TTL, msg))

    def pop(self):
        self._queue = [(t, msg) for (t, msg) in self._queue if t > time.time()]
        if len(self._queue) == 0:
            raise EmptyException()
        return self._queue.pop(0)[1]

queues = [TTLQueue(), TTLQueue(), TTLQueue()]  # this could be a dict or set or
                                               #    whatever if I knew what keys
                                               #    you expected
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...