Dask получает «FileNotFoundError: [Errno 2] Нет такого файла или каталога» в середине файла - PullRequest
0 голосов
/ 06 мая 2019

Я делаю сумку из простого текстового файла - он получил кучу отзывов, разделенных двумя символами новой строки.Но иногда - и я действительно не могу предсказать, когда - это дает мне FileNotFoundError: [Errno 2] No such file or directory: '/mnt/c/Workspaces/Books/Dask/foods.txt' при обработке

Вот фактический код

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import numpy as np
import dask.bag as bag
import os

def get_next_part(file, start_index, span_index=0, blocksize=1000):
    buffer = file.read(blocksize + span_index).decode('cp1252')
    delimiter_position = buffer.find('\n\n')
    if delimiter_position == -1:
        return get_next_part(file, start_index, span_index + blocksize)
        return start_index, delimiter_position

def get_item(filename, start_index, delimiter_position, encoding='cp1252'):
    with open(filename, 'rb') as file_handle:
        text = file_handle.read(delimiter_position).decode(encoding)
        return dict((element.split(': ')[0], element.split(': ')[1])
                               if len(element.split(': ')) > 1
                               else ('unknown', element)
                               for element in text.strip().split('\n'))    

with open(f"{os.getcwd()}/foods.txt", 'rb') as file_handle:
    size = file_handle.seek(0,2) - 1
    more_data = True
    output = []
    current_position = next_position = 0
    while more_data:
        if current_position >= size:
            more_data = False
            current_position, next_position = get_next_part(file_handle, current_position, 0)
            output.append((current_position, next_position))
            current_position = current_position + next_position + 2

with ProgressBar():
    reviews = (bag.from_sequence(output, npartitions=104)
               .map(lambda x: get_item(f"{os.getcwd()}/foods.txt", 

Иногда это работает нормально, но в других случаях этодает мне что-то в этом духе (каждый раз разный процент):

[##########                              ] | 26% Completed | 54.3s
FileNotFoundError                         Traceback (most recent call last)
<ipython-input-1-90a316620d10> in <module>()
     42 with ProgressBar():
     43     reviews = (bag.from_sequence(output, npartitions=104)
---> 44                .map(lambda x: get_item(f"{os.getcwd()}/foods.txt", 
     45                                        x[0],
     46                                        x[1]))

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, pool, **kwargs)
    190                            get_id=_process_get_id, dumps=dumps, loads=loads,
    191                            pack_exception=pack_exception,
--> 192                            raise_exception=reraise, **kwargs)
    193     finally:
    194         if cleanup:

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    460                         _execute_task(task, data)  # Re-execute locally
    461                     else:
--> 462                         raise_exception(exc, tb)
    463                 res, worker_id = loads(res_info)
    464                 state['cache'][key] = res

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
    109     def reraise(exc, tb=None):
    110         if exc.__traceback__ is not tb:
--> 111             raise exc.with_traceback(tb)
    112         raise exc

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/local.py in execute_task()
    228     try:
    229         task, data = loads(task_info)
--> 230         result = _execute_task(task, data)
    231         id = get_id()
    232         result = dumps((result, id))

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/core.py in _execute_task()
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/bag/core.py in reify()
   1589 def reify(seq):
   1590     if isinstance(seq, Iterator):
-> 1591         seq = list(seq)
   1592     if seq and isinstance(seq[0], Iterator):
   1593         seq = list(map(list, seq))

~/anaconda3/envs/py36/lib/python3.6/site-packages/dask/bag/core.py in map_chunk()
   1749     else:
   1750         for a in zip(*args):
-> 1751             yield f(*a)
   1753     # Check that all iterators are fully exhausted

<ipython-input-1-90a316620d10> in <lambda>()
     44                .map(lambda x: get_item(f"{os.getcwd()}/foods.txt", 
     45                                        x[0],
---> 46                                        x[1]))
     47               .compute())

<ipython-input-1-90a316620d10> in get_item()
     19 def get_item(filename, start_index, delimiter_position, encoding='cp1252'):
---> 20     with open(filename, 'rb') as file_handle:
     21         file_handle.seek(start_index)
     22         text = file_handle.read(delimiter_position).decode(encoding)

FileNotFoundError: [Errno 2] No such file or directory: '/mnt/c/Workspaces/Books/Dask/foods.txt'

Я пробовал возиться с номерами разделов - оставив его по умолчанию (101), или убедившись, что он кратен 4.Кажется, не имеет никакого эффекта.

Кто-нибудь знает, что здесь происходит?Обычно это работает, если я запускаю его во второй раз, но с этим все еще трудно справиться.

Я использую последнюю версию Dask.Используя conda, все это в Jupyterlab, и я запускаю его из подсистемы Windows для Linux


1 Ответ

0 голосов
/ 14 мая 2019

Не удалось исправить мой метод начального чтения, но удалось найти другой способ параллельного чтения (также с нативными объектами Dask!)

Разделы были разделены \n\n, а аргумент linedelimiter для bag не означал того, что я думал, что это означало, но с этим я смог найти способ получить нужные мне разделы: Почему `linedelimiter` не работает для bag.read_text?

        .map_partitions(lambda x: "".join(x).split("\n\n"))