Массив памяти Python, общий для нескольких потоков (например, memcached в пространстве процесса) - PullRequest
2 голосов
/ 18 января 2011

Итак, у меня есть многопоточная программа, которая в двух словах загружает веб-страницы, обрабатывает их и сохраняет результаты. Правила и все, что он использует для обработки веб-страниц, хранится в базе данных. Первоначально база данных была абсолютно забита (для обработки каждой веб-страницы требовалось 1-50 запросов к базе данных). Шаг 1 кэшировал эту информацию в memcached (и если для домена не существует правил, он просто возвращает пустую строку ""), что является огромным улучшением по сравнению с ударом по базе данных 1-50 раз для каждого обработанного элемента. Но я все еще работаю над memcached, который добавляет задержку в сети (1-50 циклов для каждого обработанного элемента, он быстро складывается даже в локальном Ethernet).

Так что я бы хотел кешировать результаты в массиве в пространстве процесса, в основном реплицируя memcached в памяти. С точки зрения данных это не так уж плохо, я собираюсь использовать набор Python, чтобы в основном реплицировать хранилище ключей: значений (достаточно просто).

Но вот в чем дело: обычно несколько потоков попадают на один и тот же сайт и нуждаются в одинаковом наборе правил, поэтому я бы хотел предотвратить проблему с громадным стадом (т. Е. 10 потоков пытаются получить правила для example.com). , что, если не в локальном кеше, а не в memcached, приведет к попаданию в базу данных, не очень сильно, но немного).

  1. Установить поток ("update_thread") для обновления массива в памяти, иметь рабочую очередь, если поток не может получить правила для домена из локального кэша, он записывает домен в рабочую очередь и sleep для части секунды, затем пытается снова, спать и пробовать снова, пока кэш локальной памяти не имеет или пустую строку "" или набор правил для использования. Поток "update_thread" считывает рабочую очередь и получает правила из memcached или, если не существует, из базы данных и записывает их в memcached и локальный кеш (и, если нет правил, выдает пустую строку "" в значении). Недостатками этого являются добавление потока; больше разногласий по GIL, небольшие задержки (нам нужно дождаться запуска update_thread, поскольку мы находимся во власти GIL). Плюс добавленная сложность другого потока и рабочей очереди. Только update_thread может записывать в массив кеша в памяти, поэтому нет необходимости блокировать / etc.

  2. Мы используем блокировку для управления доступом для записи в массив кеша в памяти. Если поток не может найти набор правил, он пытается получить набор правил из memcached, если его нет, он попадает в базу данных, как только он находит правила, он блокирует массив памяти и записывает правила (или пустую строку "" для значения) в кэш-память. Недостаток: у нас все еще может быть проблема громового стада, но это можно компенсировать, написав специальное значение, такое как «получение правил, просто подождите секунду», для домена, который заставит другие потоки ждать, например.

Может кто-нибудь еще подумать о каких-либо других решениях или прокомментировать два решения, которые я предложил? Я подозреваю, что пойду с номером 2, так как блокировка + «получение правил, просто подождите секунду» кажется проще, чем добавление потока и рабочей очереди. Или я упускаю какое-то ослепительно очевидное и простое решение?

Ответы [ 2 ]

1 голос
/ 18 января 2011

Если я правильно понял, проблема в том, что несколько потоков имеют тенденцию извлекать одни и те же данные из memcached одновременно. Вы хотели бы скоординировать потоки так, чтобы один поток извлекал данные, в то время как другие ожидают и делятся данными после их поступления.

Создайте класс-оболочку для объектов, которые вы хотите кэшировать. Поместите пустую оболочку в кэш, прежде чем начинать извлекать значение по сети. Если другой поток ищет те же данные, он будет блокироваться до получения значения.

Вот объект-обертка:

class PendingValue(object):
    def __init__(self):
        self._event = threading.Event()

    def get(self):
        self._event.wait()
        return self._value

    def set(self, value):
        self._value = value
        self._event.set()

и вот кеш:

class Cache(object):
    def __init__(self):
        self._dict = {}
        self._lock = threading.Lock()

    def __getitem__(self, key):
        self._lock.acquire()
        try:
            pv = self._dict[key]
            self._lock.release()
            return pv.get()
        except KeyError: #key not in cache
            pv = PendingValue()
            self._dict[key] = pv
            self._lock.release()
            value = retrieve_value_from_external_source()
            pv.set(value)
            return value
0 голосов
/ 18 января 2011

Это довольно большой переход от отдельного контролируемого пространства процессов к разделяемой памяти и некоторому мьютексу.

Если ваша проблема заключается в задержке, вызванной пятидесяти обходами для обработки чего-либо, почему бы вам не использовать multiget и всегда просто делать одно?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...