Многопоточные рекурсивные функции в Python 3 - PullRequest
2 голосов
/ 13 марта 2019

Справочная информация:

Я работаю над сценарием обнаружения Telecoms Network, который запускается crontab на linux.Он использует начальный файл исходных сетевых узлов, он подключается к ним, получает всех соседей, а затем подключается к этим соседям и так далее, и так далее.Типичная рекурсия.Для ускорения всего этого я использовал многопоточность с семафором, поэтому у меня было только определенное количество запущенных потоков, но огромное количество запущенных потоков, ожидающих.В какой-то момент я столкнулся с максимальным пределом потока в linux, поэтому сценарий не смог запустить новые потоки.

Проблема:

В поисках проекта, который позволил бы многопоточность этогорекурсия показалась мне случаем сценария мульти гибридного производителя / потребителя.Несколько потребителей также производят.

Потребитель берет элемент из очереди, потребляет его и, если есть какие-либо результаты, возвращает каждый результат снова в очередь.

Чтобы сделать его действительно приятным, я бы хотелсоздать шаблон дизайна, который можно использовать для любого типа рекурсивной функции, другими словами, с любыми аргументами и kwargs.

Что я ожидаю от такой функции, так это то, что я передаю ей любую комбинацию переменных (args, kwargs)что ему нужно, и я получаю в ответ список аргументов, чтобы я мог передать его снова в других рекурсиях.

Вопросы:

  • Есть ли лучший способ обработкиполучить аргументы, kwargs из функции возврата, кроме той, которую я использовал?Я в основном создал кортеж (args, kwargs) (tuple (), dict ()), который возвращает func, и Worker разделяет его на args, kwargs впоследствии.Идеально было бы вообще не создавать этот кортеж.

  • Не могли бы вы дать какие-нибудь другие советы по улучшению этого дизайна?

СпасибоС уважением!


Текущий код:

#!/usr/bin/env python3

from queue import Queue, Empty
from threading import Thread
from time import sleep
from random import choice, random


class RecursiveWorkerThread(Thread):

    def __init__(self, name, pool):
        Thread.__init__(self)
        self.name = name
        self.pool = pool
        self.tasks = pool.tasks
        self.POISON = pool.POISON
        self.daemon = False
        self.result = None
        self.start()

    def run(self):
        print(f'WORKER {self.name} - is awake.')
        while True:
            if not self.tasks.empty():
                # take task from queue
                try:
                    func, f_args, f_kwargs = self.tasks.get(timeout=1)

                    # check for POISON
                    if func is self.POISON:
                        print(f'WORKER {self.name} - POISON found. Sending it back to queue. Dying...')
                        self.pool.add_task(self.POISON)
                        break

                    # try to perform the task on arguments and get result
                    try:
                        self.result = func(*f_args, **f_kwargs)
                    except Exception as e:
                        print(e)

                    # recursive part, add results to queue
                    print(f'WORKER {self.name} - FUNC: ({func.__name__}) IN: (args: {f_args}, kwargs: {f_kwargs}) OUT: ({self.result}).')
                    for n_args, n_kwargs in self.result:
                        self.pool.add_task(func, *n_args, **n_kwargs)

                    # mark one task done in queue
                    self.tasks.task_done()
                except Empty:
                    pass
            sleep(random())


class RecursiveThreadPool:

    def __init__(self, num_threads):
        self.tasks = Queue()
        self.POISON = object()

        print('\nTHREAD_POOL - initialized.\nTHREAD_POOL - waking up WORKERS.')

        self.workers = [RecursiveWorkerThread(name=str(num), pool=self) for num in range(num_threads)]

    def add_task(self, func, *args, **kwargs):
        if func is not self.POISON:
            print(f'THREAD_POOL - task received: [func: ({func.__name__}), args: ({args}), kwargs:({kwargs})]')
        else:
            print('THREAD_POOL - task received: POISON.')
        self.tasks.put((func, args, kwargs))

    def wait_for_completion(self):
        print('\nTHREAD_POOL - waiting for all tasks to be completed.')

        self.tasks.join()

        print('\nTHREAD_POOL - all tasks have been completed.\nTHREAD_POOL - sending POISON to queue.')

        self.add_task(self.POISON)

        print('THREAD_POOL - waiting for WORKERS to die.')

        for worker in self.workers:
            worker.join()

        print('\nTHREAD_POOL - all WORKERS are dead.\nTHREAD_POOL - FINISHED.')


# Test part
if __name__ == '__main__':

    percentage = [True] * 2 + [False] * 8

    # example function
    def get_subnodes(node):
        maximum_subnodes = 2
        sleep(5 * random())
        result_list = list()
        for i in range(maximum_subnodes):
            # apply chance on every possible subnode
            if choice(percentage):
                new_node = node + '.' + str(i)
                # create single result
                args = tuple()
                kwargs = dict({'node': new_node})
                # append it to the result list
                result_list.append((args, kwargs))
        return result_list


    # 1) Init a Thread pool with the desired number of worker threads
    THREAD_POOL = RecursiveThreadPool(10)

    # 2) Put initial data into queue
    initial_nodes = 10
    for root_node in [str(i) for i in range(initial_nodes)]:
        THREAD_POOL.add_task(get_subnodes, node=root_node)

    # 3) Wait for completion
    THREAD_POOL.wait_for_completion()
...