Я тренирую нейронную сеть с большими текстовыми корпусами. Каждый текст генерирует довольно большую матрицу, потому что я использую сверточную модель. Так как мои данные не помещаются в мою все еще большую память, я пытаюсь их воспроизвести и использовать keras.models fit_generator .
Для подачи кераса у меня есть конвейер, состоящий из разных этапов предварительной обработки, которые я собираю в сумку с множеством перегородок. Сумка для чтения читает файл на диске.
Даже если dask не обрабатывает итерацию умным способом (он просто вычисляет () и приводит к результату, который в моем случае взрывает память), я должен был использовать что-то вроде этого:
def compute_partition_iter(collection, **kwargs):
"""A utility to compute a collection items after items
"""
get = kwargs.pop("get", None) or _globals['get']
if get is None:
get = collection.__dask_scheduler__
postcompute_func, postcompute_args = collection.__dask_postcompute__()
dsk = collection.__dask_graph__()
for key in collection.__dask_keys__():
yield from f([partition], *args)
Это вычисляет разделы один за другим и возвращает элементы, вычисляя следующий раздел, когда мы пересекаем границу раздела.
У этого подхода есть проблема: только когда мы ударяем последний элемент из раздела, мы провоцируем вычисление следующих элементов, приводя к отставанию до следующего элемента. В этом лагере Керас застопорился, и мы теряем драгоценное время!
Итак, я представляю, как запустить вышеупомянутый compute_partition_iter
в отдельном процессе благодаря multiprocessing.Pool
, подающему разделы в Queue
, скажем, с 2 слотами, так что в генераторе у меня не всегда будет готов еще один раздел .
Но, похоже, это не поддерживается dask.bag
. Я недостаточно глубоко погрузился в код, но похоже, что используются некоторые асинхронные методы, или я не знаю, что.
Вот воспроизводимый код проблемы.
Сначала код, который работает, используя простой диапазон.
import multiprocessing
import time
def put_q(n, q):
for i in range(n):
print(i, "<-")
q.put(i)
q.put(None)
q = multiprocessing.Queue(2)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
i = True
while i is not None:
print("zzz")
time.sleep(.5)
i = q.get()
if i is None:
break
print("-> ", i)
Это выводит
0 <-
1 <-
2 <-
zzz
3 <-
-> 0
zzz
-> 1
zzz
-> 2
zzz
-> 3
zzz
вы можете видеть, что, как и ожидалось, элементы рассчитаны в ожидании, и все в порядке.
Теперь давайте заменим диапазон на dask.bag
:
import multiprocessing
import time
import dask.bag
def put_q(n, q):
for i in dask.bag.from_sequence(range(n), npartitions=2):
print(i, "<-")
q.put(i)
q.put(None)
q = multiprocessing.Queue(5)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
i = True
while i is not None:
print("zzz")
time.sleep(.5)
i = q.get()
if i is None:
break
print("-> ", i)
В ноутбуке Jupyter он бесконечно поднимается:
Process ForkPoolWorker-71:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.5/multiprocessing/pool.py", line 103, in worker
initializer(*initargs)
File "<ipython-input-3-e1e9ef9354a0>", line 8, in put_q
for i in dask.bag.from_sequence(range(n), npartitions=2):
File "/usr/local/lib/python3.5/dist-packages/dask/bag/core.py", line 1190, in __iter__
return iter(self.compute())
File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 154, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 407, in compute
results = get(dsk, keys, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/dask/multiprocessing.py", line 152, in get
initializer=initialize_worker_process)
File "/usr/lib/python3.5/multiprocessing/context.py", line 118, in Pool
context=self.get_context())
File "/usr/lib/python3.5/multiprocessing/pool.py", line 168, in __init__
self._repopulate_pool()
File "/usr/lib/python3.5/multiprocessing/pool.py", line 233, in _repopulate_pool
w.start()
File "/usr/lib/python3.5/multiprocessing/process.py", line 103, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
пока основной процесс остановлен, ожидая элементов в очереди.
Я также пытался использовать кластер ipyparallel, но в этом случае основной процесс просто останавливается (никаких следов исключения).
Кто-нибудь знает правильный способ сделать это?
Есть ли способ запустить scheduler.get параллельно с моим основным кодом?