Я пытаюсь передать итератор (нестандартного) файлового объекта в функцию dask.delayed
. Когда я пытаюсь набрать compute()
, я получаю следующее сообщение от dask и приведенную ниже трассировку.
distributed.protocol.pickle - INFO - Failed to serialize
([<items>, ... ], OrderedDict(..)).
Exception: self.ptr cannot be converted to a Python object for pickling
Traceback (most recent call last):
File "/home/user/miniconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 38, in dumps
result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
File "stringsource", line 2, in pysam.libcbcf.VariantRecord.__reduce_cython__
TypeError: self.ptr cannot be converted to a Python object for pickling
Соответствующая часть источника выглядит следующим образом:
delayed(to_arrow)(vf.fetch(..), ordered_dict)
vf
- это файловый объект, а vf.fetch(..)
возвращает итератор для записей, присутствующих в файле (это VCF-файл , и я использую pysam
библиотека для чтения). Я надеюсь, что это обеспечивает достаточный контекст.
Сообщение от dask
показывает, что итерация происходит во время вызова функции, а не внутри функции, что привело меня к мысли, что, возможно, проходящие итераторы не подходят. Поэтому я сделал быструю проверку с sum(range(..))
, которая, кажется, работает. Теперь я в тупике, что мне не хватает?
Предоставить минимальный рабочий пример для этого немного сложно. Но, может быть, поможет следующее:
- Загрузите файл VCF (и его индекс) из здесь : скажем,
ALL.chrY*vcf.gz{,.tbi}
pip3 install --user pysam
- Открыть файл:
vf = VariantFile('/path/to/file.vcf.gz', mode='r')
- Что-то вроде итератора:
vf.fetch("Y", 2_600_000, 2_700_000)
- Для отложенной функции у вас может быть пустой цикл.