Еще одна проблема производителя / потребителя в Twisted Python - PullRequest
2 голосов
/ 01 марта 2011

Я создаю сервер, который хранит данные ключа / значения поверх Redis, используя Twisted Python.Сервер получает словарь JSON через HTTP, который преобразуется в словарь Python и помещается в буфер.Каждый раз, когда новые данные сохраняются, сервер планирует задачу, которая извлекает один словарь из буфера и записывает каждый кортеж в экземпляр Redis, используя клиент txredis.

class Datastore(Resource):

isLeaf = True

def __init__(self):
    self.clientCreator = protocol.ClientCreator(reactor, Redis)
    d = self.clientCreator.connectTCP(...)
    d.addCallback(self.setRedis)
    self.redis = None
    self.buffer = deque()


def render_POST(self, request):
    try:
        task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
    except IndexError:
        request.setResponseCode(503)
        return '<html><body>Error reading task_id</body></html>'  

    data = json.loads(request.content.read())
    self.buffer.append((task_id, data))
    reactor.callLater(0, self.write_on_redis)
    return ' '

@defer.inlineCallbacks 
def write_on_redis(self):
    try:
        task_id, dic = self.buffer.pop()
        log.msg('Buffer: %s' % len(self.buffer))
    except IndexError:
        log.msg('buffer empty')
        defer.returnValue(1)

    m = yield self.redis.sismember('DONE', task_id)
    # Simple check
    if m == '1':
        log.msg('%s already stored' % task_id)
    else:
        log.msg('%s unpacking' % task_id)
        s = yield self.redis.sadd('DONE', task_id)

        d = defer.Deferred()
        for k, v in dic.iteritems():
            k = k.encode()
            d.addCallback(self.redis.push, k, v)

        d.callback(None)

По сути, я сталкиваюсь с продюсером / потребителемпроблема между двумя разными соединениями, но я не уверен, что текущая реализация хорошо работает в парадигме Twisted.Я прочитал небольшую документацию по интерфейсам производитель / потребитель в Twisted, но я не уверен, что смогу использовать их в моем случае.Любые критики приветствуются: я пытаюсь понять программирование, управляемое событиями, после слишком многих лет параллелизма потоков.

1 Ответ

2 голосов
/ 02 марта 2011

API-интерфейсы производителя и потребителя в Twisted, IProducer и IConsumer, касаются управления потоком.Кажется, у вас здесь нет управления потоком, вы просто передаете сообщения из одного протокола в другой.

Поскольку управление потоком отсутствует, буфер представляет собой просто дополнительную сложность.Вы можете избавиться от этого, просто передав данные напрямую в метод write_on_redis.Таким образом, write_on_redis не нужно обрабатывать пустой буфер, вам не нужен дополнительный атрибут ресурса, и вы даже можете избавиться от callLater (хотя вы также можете сделать это, даже если вы сохранитебуфер).

Я не знаю, отвечает ли что-нибудь из этого на ваш вопрос.Что касается того, работает ли этот подход «хорошо», вот что я замечаю, просто читая код:

  • Если данные поступают быстрее, чем их принимает redis, ваш список невыполненных заданий может стать сколь угодно большим, в результате чего вам не хватает памяти.Это то, с чем может помочь управление потоком.
  • Без обработки ошибок во время вызова sismember или вызова sadd вы можете потерять задачи, если любой из них завершится неудачей, поскольку вы уже извлекли их израбочий буфер.
  • Выполнение push в качестве обратного вызова для этого Deferred d также означает, что любой неудачный push будет препятствовать отправке остальных данных.Он также передает результат Deferred, возвращаемый push (я предполагаю, что он возвращает Deferred) в качестве первого аргумента для следующего вызова, поэтому, если push более или менее не проигнорирует свой первый аргумент, выне будет подталкивать правильные данные для повторного ввода.

Если вы хотите реализовать управление потоком, вам нужно, чтобы ваш HTTP-сервер проверил длину self.buffer и, возможно, отклонил новую задачу - не добавляя его к self.buffer и возвращая некоторый код ошибки клиенту.Вы по-прежнему не будете использовать IConsumer и IProducer, но это похоже на

.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...