Передача итератора в функцию dask.delayed - PullRequest
0 голосов
/ 10 ноября 2018

Я пытаюсь передать итератор (нестандартного) файлового объекта в функцию dask.delayed. Когда я пытаюсь набрать compute(), я получаю следующее сообщение от dask и приведенную ниже трассировку.

distributed.protocol.pickle - INFO - Failed to serialize 
  ([<items>, ... ], OrderedDict(..)).
Exception: self.ptr cannot be converted to a Python object for pickling

Traceback (most recent call last):
  File "/home/user/miniconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 38, in dumps
    result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "stringsource", line 2, in pysam.libcbcf.VariantRecord.__reduce_cython__
TypeError: self.ptr cannot be converted to a Python object for pickling

Соответствующая часть источника выглядит следующим образом:

delayed(to_arrow)(vf.fetch(..), ordered_dict)

vf - это файловый объект, а vf.fetch(..) возвращает итератор для записей, присутствующих в файле (это VCF-файл , и я использую pysam библиотека для чтения). Я надеюсь, что это обеспечивает достаточный контекст.

Сообщение от dask показывает, что итерация происходит во время вызова функции, а не внутри функции, что привело меня к мысли, что, возможно, проходящие итераторы не подходят. Поэтому я сделал быструю проверку с sum(range(..)), которая, кажется, работает. Теперь я в тупике, что мне не хватает?

Предоставить минимальный рабочий пример для этого немного сложно. Но, может быть, поможет следующее:

  1. Загрузите файл VCF (и его индекс) из здесь : скажем, ALL.chrY*vcf.gz{,.tbi}
  2. pip3 install --user pysam
  3. Открыть файл: vf = VariantFile('/path/to/file.vcf.gz', mode='r')
  4. Что-то вроде итератора: vf.fetch("Y", 2_600_000, 2_700_000)
  5. Для отложенной функции у вас может быть пустой цикл.

1 Ответ

0 голосов
/ 18 ноября 2018

Краткий ответ: реструктурируйте свою функцию с задержкой так, чтобы этап открытия файла происходил внутри функции, и вместо этого вы передавали аргументы (например, путь), необходимые для указания на этот конкретный файл.

Если вам интересно, вы можете посмотреть, как Dask делает это внутренне, класс dask.bytes.core.OpenFile, который является сериализуемой вещью, которая откладывает открытие до тех пор, пока не будет использована в блоке with. Это один из удобных способов сделать это, но вы, вероятно, можете сделать что-то попроще.

...