Я бы начал с простого моделирования поведения, которое вы ищете в одном классе, выраженного максимально просто. Производительность может быть достигнута позже с помощью итеративной оптимизации, но только при необходимости (она может вам не понадобиться).
Класс ниже делает что-то примерно то, что вы описываете. Очереди - это просто списки, которые названы и хранятся в словаре. Каждое сообщение имеет метку времени и вставляется в начало списка (FIFO). Сообщения получают, проверяя временную метку сообщения в конце списка и выталкивая его до тех пор, пока оно не достигнет сообщения, которое находится ниже порогового значения возраста.
Если вы планируете получить к нему доступ из нескольких потоков, вам потребуется добавить несколько мелкозернистых блокировок, чтобы выжать из них максимальную производительность. Например, метод reap()
должен блокировать только 1 очередь за раз, а не блокировать все очереди (синхронизация на уровне метода), поэтому вам также необходимо сохранить блокировку для каждой именованной очереди.
Обновлено - теперь используется глобальный набор сегментов (с отметкой времени, с разрешением 1 секунда), чтобы отслеживать, в каких очередях есть сообщения с того времени. Это уменьшает количество очередей, проверяемых на каждом проходе.
import time
from collections import defaultdict
class QueueMap(object):
def __init__(self):
self._expire = defaultdict(lambda *n: defaultdict(int))
self._store = defaultdict(list)
self._oldest_key = int(time.time())
def get_queue(self, name):
return self._store.get(name, [])
def pop(self, name):
queue = self.get_queue(name)
if queue:
key, msg = queue.pop()
self._expire[key][name] -= 1
return msg
return None
def set(self, name, message):
key = int(time.time())
# increment count of messages in this bucket/queue
self._expire[key][name] += 1
self._store[name].insert(0, (key, message))
def reap(self, age):
now = time.time()
threshold = int(now - age)
oldest = self._oldest_key
# iterate over buckets we need to check
for key in range(oldest, threshold + 1):
# for each queue with items, expire the oldest ones
for name, count in self._expire[key].iteritems():
if count <= 0:
continue
queue = self.get_queue(name)
while queue:
if queue[-1][0] > threshold:
break
queue.pop()
del self._expire[key]
# set oldest_key for next pass
self._oldest_key = threshold
Использование:
qm = QueueMap()
qm.set('one', 'message 1')
qm.set('one', 'message 2')
qm.set('two', 'message 3')
print qm.pop('one')
print qm.get_queue('one')
print qm.get_queue('two')
# call this on a background thread which sleeps
time.sleep(2)
# reap messages older than 1 second
qm.reap(1)
# queues should be empty now
print qm.get_queue('one')
print qm.get_queue('two')