Многопоточность в Python на основе объектного потока (т.е. потока только с одним элементом) - PullRequest
0 голосов
/ 10 апреля 2019

Мне интересно, как я могу автоматически распараллелить программу на Python, основанную на объектном потоке (с учетом определения потока, не нужно анализировать) ?Под потоком объектов я подразумеваю, что это как поток, но вместо непрерывного потока он вводит большой объект и выводит большой объект (например, numpy.ndarray).

Это особенно полезно в numpy, потому что numpyвыполняет большую векторизацию, а не объединяет циклы.Вычисление может принимать входные данные от нескольких предшествующих вычислений, а его выходные данные могут использоваться несколькими последующими вычислениями.Эти предшественники и преемники могут быть вычислены (отдельно) одновременно.

Примером может быть:

a  = np.random.rand(1024)
b1 = a.mean()
b2 = c.std()
c  = (a - b1) / b2

Здесь b1 (среднее) и b2 std могут быть вычисленыв то же время.Для больших вычислений это очень полезно, поскольку не все операции в numpy являются многопоточными.

Было бы полезно, если бы пакет мог автоматически определять порядок вычислений, назначать вычисления ядрам ивозможно, избегайте использования слишком большого количества памяти, ведущей к MemoryError или убийце OOM.Использование numpy обойдет ограничение GIL в Python, поэтому многопоточность является хорошим выбором.

С одной стороны, Я ищу какой-то разработанный пакет или решение для этого .С другой стороны, я сам написал пакет, который пытается решить эту проблему.Код прост:

from threading import Thread, Event
class Require:
    def __init__(s, f_worker_map):
        s.d = dict(f_worker_map)
    def get(s, f, arg):
        s.cache = {(f, arg): [Event(), None, 0]}
        s._get(f, arg).join()
        return s.cache[f, arg][1]
    def _get(s, f, arg):
        require = f(*arg)
        for i in require:
            t = s.cache.get(i, None)
            if t:
                t[2] += 1
            else:
                t = [Event(), None, 0]
                s.cache[i] = t
                s._get(*i)
        t = Thread(target=s._thread, args=(s.d[f], arg, require, (f, arg)))
        t.start()
        return t
    def _thread(s, f, arg, require, id):
        for i,v in enumerate(require):
            s.cache[v][0].wait()
            require[i] = s.cache[v][1]
            if s.cache[v][2] < 1:
                del s.cache[v]
            else:
                s.cache[v][2] -= 1
        s.cache[id][1] = f(*arg, *require)
        s.cache[id][0].set()

Здесь f - это функция, возвращающая «зависимость» в виде list из f-arg tuple, а worker - фактическая функция вычисления, которая обрабатываетрезультат зависимостей и выходных данных.

Этот код работает, но не выполняет управление памятью, и имеет некоторую проблему состояния гонки в отношении счетчика ссылок кэша s.cache[id][2].Состояние гонки не является серьезной проблемой, но управление памятью.Это оказывается очень сложной проблемой, поскольку она должна минимизировать пиковое использование памяти, решая порядок вычислений.Мне интересно, есть ли теория или практика по этому поводу?

Спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...