Python & Redis: лучшие практики приложений / менеджеров - PullRequest
1 голос
/ 17 июня 2011

У меня есть несколько общих вопросов об использовании Python и Redis для создания приложения очереди заданий для выполнения асинхронных команд. Вот код, который я сгенерировал до сих пор:

def queueCmd(cmd):
    r_server.rpush("cmds", cmd)

def printCmdQueue():
    print r_server.lrange("cmds", 0 , -1)

def work():
    print "command being consumed: ", r_server.lpop("cmds")
    return -1

def boom(info):
    print "pop goes the weasel"

if __name__ == '__main__':

    r_server = redis.Redis("localhost")

    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")

    printCmdQueue()

    pool = Pool(processes=2)

    print "cnt:", +r_server.llen("cmds")
    #while r_server.llen("cmds") > 0:
    while True:
        pool.apply_async(work, callback=boom)
        if not r_server.lrange("cmds", 0, -1):
        #if r_server.llen("cmds") == 0:
            print "Terminate pool"
            pool.terminate()
            break

    printCmdQueue()

Во-первых, правильно ли я считаю, что если мне нужно связаться с менеджером, я хочу сделать это с помощью обратного вызова? Быстрые примеры, которые я видел в этом примере, сохраняют асинхронный вызов в результате и получают к нему доступ через result.get (timeout = 1). И под связью я имею в виду положить вещи обратно в список redis.

Редактировать: если команда запускается в асинхронном режиме, и у меня истекает время ожидания результата в главном, это время ожидания рабочего или только эта операция внутри менеджера? Если бы только менеджер не мог использовать это для проверки кодов выхода от работника?

Далее этот код выдает следующий вывод:

['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
Terminate pool
command being consumed:  None
 pop goes the weasel
pop goes the weasel
pop goes the weasel
[]

Почему работник хочет использовать несколько cmds за раз, хотя я выкидываю их по одному за раз? На подобном нет, это не всегда хорошо заканчивается и иногда требует Ctrl + C. Чтобы разобраться со своим, я очищаю очередь и снова иду. Я думаю, что это относится к apply_sync () и если выйти из цикла. Мне интересно, должно ли произойти больше на рабочей стороне?

Если я изменю if на закомментированный, я получу:

ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'

Похоже, это был бы лучший способ проверить, нужно ли мне разбивать, но кажется, что функция время от времени возвращает строковый литерал?

Буду очень признателен за любые рекомендации по улучшению этого. Я просто пытаюсь создать менеджера, который будет похож на сервис / демон на машине с Linux. Он будет использоваться для получения заданий (в настоящее время команд, но, возможно, больше) из списка повторного просмотра и возвращает результаты обратно в список повторного редактирования. Затем в дальнейшем графический интерфейс будет взаимодействовать с этим менеджером для получения статуса очередей и возврата результатов.

Спасибо

EDIT:

Я понял, что был немного глупым. Мне не нужно обращаться к серверу redis от работника, и это приводило к некоторым ошибкам (в частности, ValueError).

Чтобы исправить это, петля теперь:

while not r_server.llen("cmds") == 0:
    cmd = r_server.lpop("cmds")
    pool.apply_async(work, [cmd])

После этих строк я звоню pool.close(). Я использовал os.getpid() и os.getppid(), чтобы проверить, действительно ли у меня бегают несколько детей.

Мне бы все равно было приятно услышать, звучит ли это как хороший способ создания приложения менеджера / рабочего, использующего redis.

1 Ответ

2 голосов
/ 17 июня 2011

Ваша проблема в том, что вы пытаетесь запустить несколько команд одновременно с помощью одного подключения Redis.

Вы ожидаете что-то вроде

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
command      
             LLEN test
             0

но вы получаете

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
             LLEN test
             command
0

Результаты возвращаются в том же порядке, но ничто не связывает поток или команду с конкретным результатом. Отдельные соединения redis не являются поточно-ориентированными - вам понадобится одно для каждого рабочего потока.

Подобные проблемы также могут возникать, если вы неправильно используете конвейеризацию - он предназначен только для сценариев записи, таких как добавление большого количества элементов в список, где вы можете повысить производительность, предполагая, что LPUSH успешен, а не ожидая, когда сервер сообщит вам об этом. удалось после каждого элемента. Redis по-прежнему будет возвращать результаты, но они не обязательно будут результатами последней отправленной команды.

Кроме этого, базовый подход разумен. Вы можете сделать несколько улучшений:

  • Вместо проверки длины просто используйте неблокирующий LPOP - если он возвращает ноль, список пуст
  • Добавьте таймер, чтобы, если список пуст, он ожидал, а не просто набрал другую команду.
  • Включить проверку отмены в условие цикла while
  • Обрабатывать ошибки подключения - я использую внешний цикл, настроенный таким образом, чтобы в случае сбоя подключения рабочий попытался повторно подключиться (в основном перезапустить main ) для разумного числа попыток, прежде чем полностью завершить рабочий процесс.
...