Я использую Dask, распределенный по большому университетскому кластеру SLURM.У меня смущающая параллельная проблема, в которой я читаю на изображении и выполняю некую непонятную обработку.
Я запускаю кластер
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
from dask import delayed
num_workers=10
#job args
extra_args=[
"--error=/home/b.weinstein/logs/dask-worker-%j.err",
"--account=ewhite",
"--output=/home/b.weinstein/logs/dask-worker-%j.out"
]
cluster = SLURMCluster(
processes=1,
queue='hpg2-compute',
cores=1,
memory='20GB',
walltime='48:00:00',
job_extra=extra_args,
local_directory="/home/b.weinstein/logs/",death_timeout=300)
и выполняю свою функцию (Generate.run)
cluster.scale(num_workers)
dask_client = Client(cluster)
#Start dask dashboard? Not clear yet.
dask_client.run_on_scheduler(start_tunnel)
###Set scheduler.
values = [delayed(Generate.run)(x) for x in data_paths]
Все отлично работает в течение часа, выполняя множество задач.Время от времени оно будет повреждено, и PIL вызовет исключение.
Пример исключения
(DeepLidar) [b.weinstein@login3 ~]$ python
Python 3.6.7 | packaged by conda-forge | (default, Nov 21 2018, 02:32:25)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-15)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from PIL import Image
>>> Image.open("/ufrc/ewhite/b.weinstein/NeonData/SJER/DP3.30010.001/2018/FullSite/D17/2018_SJER_3/L3/Camera/Mosaic/V01/2018_SJER_3_252000_4114000_image.tif")
/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/PIL/TiffImagePlugin.py:771: UserWarning: Corrupt EXIF data. Expecting to read 2 bytes but only got 0.
warnings.warn(str(msg))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/PIL/Image.py", line 2657, in open
% (filename if filename else fp))
OSError: cannot identify image file '/ufrc/ewhite/b.weinstein/NeonData/SJER/DP3.30010.001/2018/FullSite/D17/2018_SJER_3/L3/Camera/Mosaic/V01/2018_SJER_3_252000_4114000_image.tif'
Из чтения http://distributed.dask.org/en/latest/resilience.html#user-code-failures
Я ожидал, что задача потерпит неудачу,но кластер будет сохраняться.В худшем случае я ожидал, что работник потерпит неудачу.Вместо этого все рабочие и планировщик умирают.Почему исключение распространяется на планировщик?В журналах не так много информации:
KeyError: 'start'
Traceback (most recent call last):
File "/home/b.weinstein/DeepLidar/dask_generate.py", line 105, in <module>
run_HPC(data_paths)
File "/home/b.weinstein/DeepLidar/dask_generate.py", line 92, in run_HPC
results = compute(*values,scheduler=dask_client)
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/dask/base.py", line 397, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/distributed/client.py", line 2318, in get
direct=direct)
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/distributed/client.py", line 1652, in gather
asynchronous=asynchronous)
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/distributed/client.py", line 670, in sync
return sync(self.loop, func, *args, **kwargs)
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/distributed/utils.py", line 277, in sync
six.reraise(*error[0])
File "/apps/geos/3.6.2/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/distributed/utils.py", line 262, in f
result[0] = yield future
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/distributed/client.py", line 1497, in _gather
traceback)
File "/apps/geos/3.6.2/lib/python3.6/site-packages/six.py", line 692, in reraise
raise value.with_traceback(tb)
File "/home/b.weinstein/DeepLidar/DeepForest/Generate.py", line 28, in run
windows = preprocess.create_windows(data,DeepForest_config)
File "/home/b.weinstein/DeepLidar/DeepForest/preprocess.py", line 306, in create_windows
windows=compute_windows(image=image_path, pixels=DeepForest_config["patch_size"], overlap=DeepForest_config["patch_overlap"])
File "/home/b.weinstein/DeepLidar/DeepForest/preprocess.py", line 151, in compute_windows
im = Image.open(image)
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/PIL/Image.py", line 2657, in open
% (filename if filename else fp))
OSError: cannot identify image file '/ufrc/ewhite/b.weinstein/NeonData/SJER/DP3.30010.001/2018/FullSite/D17/2018_SJER_3/L3/Camera/Mosaic/V01/2018_SJER_3_252000_4114000_image.tif'
tornado.application - ERROR - Exception in Future <Future cancelled> after timeout
Traceback (most recent call last):
File "/home/b.weinstein/miniconda/envs/DeepLidar/lib/python3.6/site-packages/tornado/gen.py", line 970, in error_callback
future.result()
concurrent.futures._base.CancelledError
Я пытался увеличить память , не допуская проливания dask до диск .Что еще может вызвать это поведение?
Заключение оператора * compute в попытку, за исключением того, что показывает, что планировщик вызывает исключение, несмотря на предположение, что исключения не убивают клиентов.
try:
compute(*values,scheduler='distributed')
except Exception as e:
print(e)
выдает в логах клиента:
Invalid image /ufrc/ewhite/b.weinstein/NeonData/SJER/DP3.30010.001/2018/FullSite/D17/2018_SJER_3/L3/Camera/Mosaic/V01/2018_SJER_3_258000_4111000_image.tif
и убивает все.Я думал, что DASK был отказоустойчив для рабочих?
Отказоустойчивость в Spark против Dask
System Info:
Python 3.6.7 | packaged by conda-forge | (default, Nov 21 2018, 02:32:25)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-15)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import dask
>>> dask.__version__
'0.20.2'
>>> import dask_jobqueue
>>> dask_jobqueue.__version__
'0.4.1'