Файл, заблокированный от записи, иногда не может найти содержимое (при открытии маринованных панд DataFrame) - EOFError: исчерпан ввод - PullRequest
0 голосов
/ 29 октября 2018

Вот моя ситуация: мне нужно запустить 300 процессов в кластере (они независимы), чтобы все добавили часть своих данных в один и тот же DataFrame (им также нужно будет прочитать файл перед записью в следствии). Возможно, им придется делать это несколько раз в течение всего времени их выполнения.

Поэтому я попытался использовать заблокированные от записи файлы с пакетом portalocker. Тем не менее, я получаю ошибку типа, и я не понимаю, откуда она.

Вот скелетный код, в котором каждый процесс будет записывать в один и тот же файл:

with portalocker.Lock('/path/to/file.pickle', 'rb+', timeout=120) as file:
    file.seek(0)
    df = pd.read_pickle(file)

    # ADD A ROW TO THE DATAFRAME

    # The following part might not be great,
    # I'm trying to remove the old contents of the file first so I overwrite
    # and not append, not sure if this is required or if there's
    # a better way to do this.
    file.seek(0)
    file.truncate()
    df.to_pickle(file)

Вышеуказанное работает большую часть времени. Однако чем больше одновременных процессов у меня блокирует запись, тем больше я получаю ошибку EOFError на этапе pd.read_pickle(file).

EOFError: Ran out of input

Отслеживание очень длинное и запутанное.

В любом случае, мои мысли пока таковы, что, поскольку он иногда работает, приведенный выше код должен быть в порядке * (хотя он может быть грязным, и я не возражаю услышать о лучшем способе сделать то же самое).

Однако, когда у меня слишком много процессов, пытающихся заблокировать запись, я подозреваю, что у файла нет времени или чего-то еще, или, по крайней мере, каким-то образом следующие процессы еще не видят содержимое, которое было сохранено предыдущий процесс.

Был бы способ обойти это? Я попытался добавить в time.sleep(0.5) операторы вокруг моего кода (до read_pickle, после to_pickle), и я не думаю, это помогло. Кто-нибудь понимает, что может происходить, или знает лучший способ сделать это?

Также обратите внимание, я не думаю, что время блокировки записи истекло. Я попытался синхронизировать процесс, и я также добавил туда флаг, чтобы отметить, если время блокировки записи истекло. В то время как есть 300 процессов, и они могут пытаться писать с разной скоростью, в целом я бы оценил, что примерно 2,5 записи в секунду, что не похоже на перегрузку системы? *

* Размер маринованного DataFrame составляет несколько сотен КБ.

1 Ответ

0 голосов
/ 30 октября 2018

Возможно, это был немного расплывчатый вопрос, но, поскольку мне удалось это исправить и лучше понять ситуацию, я думаю, что было бы полезно опубликовать здесь то, что я нашел, в надежде, что другие тоже смогут это найти. Мне кажется, что это проблема, которая потенциально может подойти и другим.

Зачислите на страницу github «portalocker» за помощь и ответ: https://github.com/WoLpH/portalocker/issues/40

Кажется, проблема в том, что я делал это в кластере. В результате файловая система немного усложняется, так как процессы работают на нескольких узлах. "Синхронизация" сохраненных файлов может занять больше времени, чем ожидалось, и их будут видеть разные запущенные процессы.

То, что мне показалось удачным, - это очистить файл и принудительно запустить синхронизацию системы, а также (еще не уверен, если это необязательно) добавить после этого более длительный time.sleep ().

По словам разработчика «portalocker», для синхронизации файлов в кластере может потребоваться непредсказуемое время, поэтому может потребоваться изменить время ожидания.

Другими словами, добавьте это после сохранения файла:

df.to_pickle(file)
file.flush()
os.fsync(file.fileno())

time.sleep(1)

Надеюсь, это поможет кому-то еще.

...