Изменение рабочих задач python-gearman во время обработки задания - PullRequest
0 голосов
/ 19 марта 2011

Я пытаюсь изменить задачи, доступные на работнике python-gearman во время его рабочего цикла.Моя причина для этого - дать мне немного контроля над моими рабочими процессами и позволить им перезагрузиться из базы данных.Мне нужно, чтобы каждый работник перезагружался через регулярные промежутки времени, но я не хочу просто убивать процессы, и я хочу, чтобы служба была постоянно доступна, что означает, что мне нужно перезагружать пакетами.Таким образом, у меня будет 4 перезагрузки рабочих, в то время как еще 4 рабочих доступны для обработки, а затем перезагрузите следующие 4 рабочих.

Процесс:

  1. Запустите процесс перезагрузки 4 раза.
    1. отменить регистрацию процесса reload
    2. перезагрузить набор данных
    3. зарегистрировать finishReload задачу
    4. return
  2. Повторяйте шаг 1, пока не останется рабочих с зарегистрированной задачей reload.
  3. Запустите задачу finishReload (1), пока не останется рабочих с доступной задачей finishReload.

(1) задача finishReload отменяет регистрацию задачи finishReload, регистрирует задачу reload, а затем возвращает ее.

Теперь проблема, с которой я сталкиваюсь, заключается в том, что задание завершается неудачно, когда яизменить задачи, доступные для рабочего процесса.Там нет сообщений об ошибках или исключений, просто «ОШИБКА» в журнале передач.Вот быстрая программа, которая воспроизводит проблему.

РАБОЧИЙ

import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

КЛИЕНТ

import gearman 
client = gearman.GearmanClient(['localhost:4730']) 
a = client.submit_job('reversify', 'spam and eggs') 
print a.result 
>>> sgge dna maps 

a = client.submit_job('strcount', 'spam and eggs') 
...

Пожалуйста, дайте мне знать, если есть что-то, что я могу объяснить.

РЕДАКТИРОВАТЬ: Я знаю, что кто-то попросит посмотреть журнал, который я упомянул.Я также отправил этот вопрос в группу gearman в Google, и там есть журнал .

Ответы [ 2 ]

1 голос
/ 21 марта 2011

Это похоже на создание подкласса класса GearmanWorker, и добавление нескольких флагов может обойти эту проблему.Мне нужно разрешить завершить работу, прежде чем я начну вводить новые команды от рабочего к серверу, который, кажется, прерывает текущую работу.Поэтому, если мы перезаписываем функцию on_job_complete, мы можем проверить наличие флага включения / выключения и действовать после того, как мы введем команду send_job_complete.Новая рабочая программа выглядит следующим образом:

РАБОЧИЙ

import gearman

def reversify(gmWorker, gmJob):
        return "".join(gmJob.data[::-1])

def enable_reversify(gmWorker, gmJob):
        myWorker.enableReversify = 1
        return 'OK'

def strcount(gmWorker, gmJob):
        myWorker.enableReversify = -1
        return str(len(gmJob.data))

class myWorker(gearman.GearmanWorker):

        enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on

        def on_job_complete(self, current_job, job_result):
                self.send_job_complete(current_job, job_result)
                ### check the flag here and enable or disable tasks ###
                if myWorker.enableReversify == -1:
                        self.unregister_task('reversify')
                if myWorker.enableReversify == 1:
                        self.register_task('reversify', reversify)
                myWorker.enableReversify = 0 # reset the flag
                return True

worker = myWorker(['localhost:4730']) 
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)

while True:
        worker.work() 
0 голосов
/ 21 марта 2011

На первый взгляд, проблема может заключаться в том, что вы начинаете работу, а затем отменяете регистрацию способности рабочих выполнять эту работу с сервера заданий до ее завершения.

...