Убедиться, что рабочий процесс всегда заканчивается в zeroMQ - PullRequest
3 голосов
/ 26 июля 2011

Я реализую шаблон конвейера с zeroMQ, используя привязки python.

задачи раздаются работникам, которые слушают новые задачи с бесконечным циклом, подобным этому:

    while True:
        socks = dict(self.poller.poll())
        if self.receiver in socks and socks[self.receiver] == zmq.POLLIN:
            msg = self.receiver.recv_unicode(encoding='utf-8')
            self.process(msg)
        if self.hear in socks and socks[self.hear] == zmq.POLLIN:
            msg = self.hear.recv()
            print self.pid,":",  msg
            sys.exit(0)

они завершают работу, когда получают сообщение от узла приемника, подтверждающее получение всех ожидаемых результатов.

однако работник может пропустить такое сообщение и не закончить. Каков лучший способ, чтобы работники всегда заканчивали, когда у них нет возможности узнать (кроме как через уже упомянутое сообщение, что больше нет задач для обработки).

Вот код тестирования, который я написал для проверки статуса рабочих:

#-*- coding:utf-8 -*-
"""
Test module containing tests for all modules of pypln 

"""
import unittest
from servers.ventilator import Ventilator
from subprocess import Popen, PIPE
import time
class testWorkerModules(unittest.TestCase):
    def setUp(self):
        self.nw = 4
        #spawn 4 workers
        self.ws = [Popen(['python', 'workers/dummy_worker.py'], stdout=None) for i in range(self.nw)]
        #spawn a sink
        self.sink = Popen(['python', 'sinks/dummy_sink.py'], stdout=None)
        #start a ventilator
        self.V = Ventilator()
        # wait for workers and sinks to connect
        time.sleep(1)

    def test_send_unicode(self):
        '''
        Pushing unicode strings through workers to sinks.
        '''

        self.V.push_load([u'são joão' for i in xrange(80)])
        time.sleep(1)
        #[p.wait() for p in self.ws]#wait for the workers to terminate
        wsr = [p.poll() for p in self.ws]
        while None in wsr:
            print wsr, [p.pid for p in self.ws if p.poll() == None] #these are the unfinished workers
            time.sleep(0.5)
            wsr = [p.poll() for p in self.ws]
        self.sink.wait()
        self.sink = self.sink.returncode
        self.assertEqual([0]*self.nw, wsr)
        self.assertEqual(0, self.sink)

if __name__ == '__main__':
    unittest.main()

1 Ответ

1 голос
/ 15 января 2013

Все сообщения в конечном итоге заканчиваются сердцебиением. Если вы (как работник, или потребитель, или кто-то еще) обнаружите, что компонент, с которым вам нужно работать, мертв, вы можете либо попытаться соединиться где-нибудь еще, либо убить себя. Поэтому, если вы как работник обнаружите, что раковины больше нет, просто выйдите. Это также означает, что вы можете выйти, даже если приемник все еще там, но соединение разорвано. Но я не уверен, что вы можете сделать больше, возможно, более разумно установить все таймауты ...

...