Вызов scheduler.multiprocessing.get в отдельном процессе в dask - PullRequest
0 голосов
/ 26 июня 2018

Я тренирую нейронную сеть с большими текстовыми корпусами. Каждый текст генерирует довольно большую матрицу, потому что я использую сверточную модель. Так как мои данные не помещаются в мою все еще большую память, я пытаюсь их воспроизвести и использовать keras.models fit_generator .

Для подачи кераса у меня есть конвейер, состоящий из разных этапов предварительной обработки, которые я собираю в сумку с множеством перегородок. Сумка для чтения читает файл на диске.

Даже если dask не обрабатывает итерацию умным способом (он просто вычисляет () и приводит к результату, который в моем случае взрывает память), я должен был использовать что-то вроде этого:

def compute_partition_iter(collection, **kwargs):
    """A utility to compute a collection items after items
    """
    get = kwargs.pop("get", None) or _globals['get']
    if get is None:
        get = collection.__dask_scheduler__
    postcompute_func, postcompute_args = collection.__dask_postcompute__()
    dsk = collection.__dask_graph__()
    for key in collection.__dask_keys__():
        yield from f([partition], *args)

Это вычисляет разделы один за другим и возвращает элементы, вычисляя следующий раздел, когда мы пересекаем границу раздела.

У этого подхода есть проблема: только когда мы ударяем последний элемент из раздела, мы провоцируем вычисление следующих элементов, приводя к отставанию до следующего элемента. В этом лагере Керас застопорился, и мы теряем драгоценное время!

Итак, я представляю, как запустить вышеупомянутый compute_partition_iter в отдельном процессе благодаря multiprocessing.Pool, подающему разделы в Queue, скажем, с 2 слотами, так что в генераторе у меня не всегда будет готов еще один раздел .

Но, похоже, это не поддерживается dask.bag. Я недостаточно глубоко погрузился в код, но похоже, что используются некоторые асинхронные методы, или я не знаю, что.

Вот воспроизводимый код проблемы.

Сначала код, который работает, используя простой диапазон.

import multiprocessing
import time


def put_q(n, q):
    for i in range(n):
        print(i, "<-")
        q.put(i)
    q.put(None)

q = multiprocessing.Queue(2)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
    i = True
    while i is not None:
        print("zzz")
        time.sleep(.5)
        i = q.get()
        if i is None:
            break
        print("-> ", i)

Это выводит

0 <-
1 <-
2 <-
zzz
3 <-
->  0
zzz
->  1
zzz
->  2
zzz
->  3
zzz

вы можете видеть, что, как и ожидалось, элементы рассчитаны в ожидании, и все в порядке.

Теперь давайте заменим диапазон на dask.bag:

import multiprocessing
import time

import dask.bag


def put_q(n, q):
    for i in dask.bag.from_sequence(range(n), npartitions=2):
        print(i, "<-")
        q.put(i)
    q.put(None)

q = multiprocessing.Queue(5)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
    i = True
    while i is not None:
        print("zzz")
        time.sleep(.5)
        i = q.get()
        if i is None:
            break
        print("-> ", i)

В ноутбуке Jupyter он бесконечно поднимается:

Process ForkPoolWorker-71:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 103, in worker
    initializer(*initargs)
  File "<ipython-input-3-e1e9ef9354a0>", line 8, in put_q
    for i in dask.bag.from_sequence(range(n), npartitions=2):
  File "/usr/local/lib/python3.5/dist-packages/dask/bag/core.py", line 1190, in __iter__
    return iter(self.compute())
  File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 154, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 407, in compute
    results = get(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/dask/multiprocessing.py", line 152, in get
    initializer=initialize_worker_process)
  File "/usr/lib/python3.5/multiprocessing/context.py", line 118, in Pool
    context=self.get_context())
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 168, in __init__
    self._repopulate_pool()
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 233, in _repopulate_pool
    w.start()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children

пока основной процесс остановлен, ожидая элементов в очереди.

Я также пытался использовать кластер ipyparallel, но в этом случае основной процесс просто останавливается (никаких следов исключения).

Кто-нибудь знает правильный способ сделать это?

Есть ли способ запустить scheduler.get параллельно с моим основным кодом?

1 Ответ

0 голосов
/ 27 июня 2018

Наконец-то я должен был взглянуть на исключение поближе!

Stackoverflow дал мне решение: Python Process Pool не демонический?

Фактически, поскольку планировщик пакетов использует Pool, он не может быть вызван внутри процесса, порожденного пулом. Решение в моем случае - просто использовать потоки. ( Обратите внимание , что ошибка и ее решение зависят от используемого вами планировщика).

Итак, я заменил multiprocessing.Pool на multiprocessing.pool.ThreadPool, и он работает как шарм, либо в обычном ноутбуке, либо когда использует ipyparallel .

Итак, все выглядит так:

import queue
from multiprocessing.pool import ThreadPool
import time

import dask.bag


def put_q(n, q):
    b = dask.bag.from_sequence(range(n), npartitions=3)
    for i in b:
        print(i, "<-")
        q.put(i)
    q.put(None)

q = queue.Queue(2)
with ThreadPool(1, put_q, (6, q)) as pool:
    i = True
    while i is not None:
        print("zzz")
        time.sleep(.5)
        i = q.get()
        if i is None:
            break
        print("-> ", i)

Какие выходы:

zzz
0 <-
1 <-
2 <-
->  0
zzz
3 <-
->  1
zzz
4 <-
-> 5 <-
 2
zzz
->  3
zzz
->  4
zzz
->  5
zzz
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...