Использование многопроцессорных каналов Python - PullRequest
12 голосов
/ 29 октября 2011

Я пытаюсь написать класс, который будет вычислять контрольные суммы, используя несколько процессов, тем самым используя преимущества нескольких ядер. У меня есть довольно простой класс для этого, и он прекрасно работает при выполнении простого случая. Но всякий раз, когда я создаю два или более экземпляров класса, рабочий никогда не выходит. Кажется, что он никогда не получит сообщение о том, что канал был закрыт родителем.

Весь код можно найти ниже. Сначала я вычисляю контрольные суммы md5 и sha1 по отдельности, что работает, а затем я пытаюсь выполнить вычисления параллельно, а затем программа блокируется, когда приходит время закрыть канал.

Что здесь происходит? Почему трубы не работают так, как я ожидаю? Думаю, я мог бы обойти эту проблему, отправив сообщение «Стоп» в очередь и заставить ребенка выйти таким образом, но мне бы очень хотелось узнать, почему это не работает так, как есть.

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):
    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        self.parent_conn.close() # This is the child. Close unused end.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()


def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums in parallel causes a lockup!
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!

main()

PS. Эта проблема была решена Вот рабочая версия приведенного выше кода, если кому-то интересно:

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):

    all_open_parent_conns = []

    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        for conn in ChecksumPipe.all_open_parent_conns:
            conn.close() # This is the child. Close unused ends.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()

def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums also works fine now
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()

main()

1 Ответ

7 голосов
/ 30 октября 2011

Да, это действительно удивительное поведение.

Однако, если вы посмотрите на вывод lsof для двух параллельных дочерних процессов, легко заметить, что у второго дочернего процесса открыто больше файловых дескрипторов.

Что происходит, когда два параллельных дочерних процесса запускаются, второй дочерний объект наследует каналы родителя, так что когда родитель вызывает self.parent_conn.close(), у второго дочернего элемента все еще остается открытый дескриптор файла канала, так что описание файла канала не закрывается в ядре (счетчик ссылок больше 0), в результате self.child_conn.recv_bytes() в первом параллельном дочернем процессе никогда не read() s EOF и EOFError никогда не генерируются.

Вам может потребоваться отправить явное сообщение о завершении работы, а не просто закрывать файловые дескрипторы, потому что кажется, что существует небольшой контроль над тем, какие файловые дескрипторы распределяются между какими процессами (отсутствует флаг дескриптора файла с закрытой вилкой). 1013 *

...