«Невозможно выбрать файлы, которые не открыты для чтения» с помощью Client.map () - PullRequest
1 голос
/ 11 апреля 2019

Я пытаюсь использовать dask.distributed для одновременного обновления базы данных Postgresql на основе содержимого из нескольких файлов CSV.В идеале мы распределяли бы файлы CSV среди N работников, где каждый работник вставлял бы содержимое базы данных CSV в базу данных.Тем не менее, мы получаем исключение Cannot pickle files that are not opened for reading при использовании Client.map() при распространении задач среди работников.

Это сжатая версия кода:

def _work(csv_path):
   db = Database() # encapsulates interaction w/ postgresql database
   db.open()

   count = 0

   with csv_path.open('r') as csv_file:
       reader = csv.DictReader(csv_file)

       for record in reader:
           db.insert(record)
           count += 1

   db.close()

   return count


client = Client(processes=False)

csv_files = Path('/data/files/').glob('*.csv')

csv_futures = client.map(_work, csv_files) # error occurs here

for finished in  as_completed(csv_futures):
   count = finished.result()
   print(count)

Исходя из связанных с этим проблем стекопотока и github, я успешно использовал cloudpickle для сериализации и десериализации функции и аргументов.

cloudpickle.loads(cloudpickle.dumps(_work))
Out[69]: <function _work(csv_path)>

и

files = list(Path('/data/files/').glob('*.csv'))
files
Out[73]: 
[PosixPath('/data/files/208.csv'),
 PosixPath('/data/files/332.csv'),
 PosixPath('/data/files/125.csv'),
 PosixPath('/data/files/8.csv')]
cloudpickle.loads(cloudpickle.dumps(files))
Out[74]: 
[PosixPath('/data/files/208.csv'),
 PosixPath('/data/files/332.csv'),
 PosixPath('/data/files/125.csv'),
 PosixPath('/data/files/8.csv')]

Итак, проблема в другом.

1 Ответ

1 голос
/ 12 апреля 2019

Точное исключение было таким:

  File "/Users/may/anaconda/envs/eagle-i/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 841, in save_file
    raise pickle.PicklingError("Cannot pickle files that are not opened for reading: %s" % obj.mode)
_pickle.PicklingError: Cannot pickle files that are not opened for reading: a

Проходя через отладчик, мне было любопытно, что было obj, а именно:

<_io.TextIOWrapper name='/tmp/logs/ei_sched.log' mode='a' encoding='UTF-8'>

В приведенном выше фрагменте кода я пропустил звонки нашему регистратору, и именно по этому поводу cloudpickle жаловался.Ведение журнала было оставшимся артефактом этой функциональности до попытки использовать dask для распараллеливания этой функциональности.После того, как я удалил вызовы журналирования из функции, переданной в Client.map(), все заработало как положено.

Кроме того, это был хороший улов из cloudpickle, поскольку запись в один файл не должна выполняться изработники даска.

...