Как я могу использовать Pool вместо Process & Pipe - PullRequest
0 голосов
/ 22 января 2020

У меня есть первый класс tree, который выполняет некоторые вычисления, и другой класс forest, который содержит деревья и выполняет больше вычислений.

Класс tree использует механизм соединения Pipe() для подгонки дерева: результирующие узлы и листья должны быть recv() деревом.

Класс forest, после подбора определенного количества деревьев, также использует механизм соединения Pipe() для вычисления других вещей в зависимости от деревьев, которые также recv() по лесу.

Когда число деревьев в лесу растет , я работаю в OSError : Too many open files и не могу увеличить ulimit, так как у меня нет доступа root на машине, где выполняется код.

Я предполагаю, что использование Pool() не вызовет этих проблем. Но я не понимаю, как я могу передавать информацию, используя Pool между дочерними и основными процессами. На данный момент я использую идиому, которая выглядит следующим образом:

from multiprocessing import Pipe, Process


class tree:
    def __init__(self):
        self.random_number = 5 # Choosed by fair roll dice

    def start_fit(self):
        self.parent_conn, child_conn = Pipe(duplex=False)
        self.process = tree_fitter(self.random_number, child_conn)
        self.process.start()
        return None

    def join_fit(self):
        self.fitting_result = self.parent_conn.recv()
        self.process.join()
        self.parent_conn.close()
        del self.parent_conn
        del self.process
        return None

class tree_fitter(Process):

    def __init__(self, random_number, connection):
        self.connection = connection
        self.new_randm_number = 2 # Choosed by fair roll dice

    def run(self):
        result = self.new_randm_number * random_number
        self.connection.send(result)
        self.connection.close()
        return 0


class forest:
    def __init__(self,n_trees):
        self.n = n_trees  # Number of trees
        self.trees = [tree() for i in range(0,self.n)]

    def compute_more_complicate_things(self):
        processes = []
        parents = []
        results = []

        for i in range(0,self.n):
            self.trees[i].start_fit()

        for i in range(0,self.n):
            self.trees[i].join_fit()
            parent_connection, child_connection = Pipe(duplex=False)
            parents.append(parent_connection)
            processes.append(forest_cumulative_mean(self, i,child_connection))

        for i in range(0,self.n):
            processes[i].start()

        for i in range(0,self.n):
            results.append(parents[i].recv())
            processes[i].join()
            parents[i].close()
            print(i, end=" ")

        del processes
        del parents
        del results
        self.cumusum_of_result = results


class forest_cumulative_mean(Process):

    def __init__(self, forest, i, connection):
        self.trees = forest.trees[:i]
        self.connection = connection

    def run(self):
        result = sum([t.result for t in self.trees])
        self.connection.send(result)
        self.connection.close()
        return 0

1 ° Как я могу преобразовать его в идиому Pool, не меняя слишком сильно структуру моего кода (это, конечно, фиктивная Например (я не пытался запустить его), у меня гораздо больше кода вокруг него ..).

2 ° Будет ли это устранять ошибку, связанную с охватом слишком большого количества процессов?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...