Я делаю сумку из простого текстового файла - он получил кучу отзывов, разделенных двумя символами новой строки.Но иногда - и я действительно не могу предсказать, когда - это дает мне FileNotFoundError: [Errno 2] No such file or directory: '/mnt/c/Workspaces/Books/Dask/foods.txt'
при обработке
Вот фактический код
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import numpy as np
import dask.bag as bag
import os
def get_next_part(file, start_index, span_index=0, blocksize=1000):
file.seek(start_index)
buffer = file.read(blocksize + span_index).decode('cp1252')
delimiter_position = buffer.find('\n\n')
if delimiter_position == -1:
return get_next_part(file, start_index, span_index + blocksize)
else:
file.seek(start_index)
return start_index, delimiter_position
def get_item(filename, start_index, delimiter_position, encoding='cp1252'):
with open(filename, 'rb') as file_handle:
file_handle.seek(start_index)
text = file_handle.read(delimiter_position).decode(encoding)
return dict((element.split(': ')[0], element.split(': ')[1])
if len(element.split(': ')) > 1
else ('unknown', element)
for element in text.strip().split('\n'))
with open(f"{os.getcwd()}/foods.txt", 'rb') as file_handle:
size = file_handle.seek(0,2) - 1
more_data = True
output = []
current_position = next_position = 0
while more_data:
if current_position >= size:
more_data = False
else:
current_position, next_position = get_next_part(file_handle, current_position, 0)
output.append((current_position, next_position))
current_position = current_position + next_position + 2
with ProgressBar():
reviews = (bag.from_sequence(output, npartitions=104)
.map(lambda x: get_item(f"{os.getcwd()}/foods.txt",
x[0],
x[1]))
.compute())
Иногда это работает нормально, но в других случаях этодает мне что-то в этом духе (каждый раз разный процент):
[########## ] | 26% Completed | 54.3s
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
<ipython-input-1-90a316620d10> in <module>()
42 with ProgressBar():
43 reviews = (bag.from_sequence(output, npartitions=104)
---> 44 .map(lambda x: get_item(f"{os.getcwd()}/foods.txt",
45 x[0],
46 x[1]))
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, pool, **kwargs)
190 get_id=_process_get_id, dumps=dumps, loads=loads,
191 pack_exception=pack_exception,
--> 192 raise_exception=reraise, **kwargs)
193 finally:
194 if cleanup:
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
460 _execute_task(task, data) # Re-execute locally
461 else:
--> 462 raise_exception(exc, tb)
463 res, worker_id = loads(res_info)
464 state['cache'][key] = res
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
109 def reraise(exc, tb=None):
110 if exc.__traceback__ is not tb:
--> 111 raise exc.with_traceback(tb)
112 raise exc
113
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/local.py in execute_task()
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/core.py in _execute_task()
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/bag/core.py in reify()
1589 def reify(seq):
1590 if isinstance(seq, Iterator):
-> 1591 seq = list(seq)
1592 if seq and isinstance(seq[0], Iterator):
1593 seq = list(map(list, seq))
~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/bag/core.py in map_chunk()
1749 else:
1750 for a in zip(*args):
-> 1751 yield f(*a)
1752
1753 # Check that all iterators are fully exhausted
<ipython-input-1-90a316620d10> in <lambda>()
44 .map(lambda x: get_item(f"{os.getcwd()}/foods.txt",
45 x[0],
---> 46 x[1]))
47 .compute())
<ipython-input-1-90a316620d10> in get_item()
18
19 def get_item(filename, start_index, delimiter_position, encoding='cp1252'):
---> 20 with open(filename, 'rb') as file_handle:
21 file_handle.seek(start_index)
22 text = file_handle.read(delimiter_position).decode(encoding)
FileNotFoundError: [Errno 2] No such file or directory: '/mnt/c/Workspaces/Books/Dask/foods.txt'
Я пробовал возиться с номерами разделов - оставив его по умолчанию (101), или убедившись, что он кратен 4.Кажется, не имеет никакого эффекта.
Кто-нибудь знает, что здесь происходит?Обычно это работает, если я запускаю его во второй раз, но с этим все еще трудно справиться.
Я использую последнюю версию Dask.Используя conda, все это в Jupyterlab, и я запускаю его из подсистемы Windows для Linux
Спасибо!