Вот моя ситуация: мне нужно запустить 300 процессов в кластере (они независимы), чтобы все добавили часть своих данных в один и тот же DataFrame (им также нужно будет прочитать файл перед записью в следствии). Возможно, им придется делать это несколько раз в течение всего времени их выполнения.
Поэтому я попытался использовать заблокированные от записи файлы с пакетом portalocker
. Тем не менее, я получаю ошибку типа, и я не понимаю, откуда она.
Вот скелетный код, в котором каждый процесс будет записывать в один и тот же файл:
with portalocker.Lock('/path/to/file.pickle', 'rb+', timeout=120) as file:
file.seek(0)
df = pd.read_pickle(file)
# ADD A ROW TO THE DATAFRAME
# The following part might not be great,
# I'm trying to remove the old contents of the file first so I overwrite
# and not append, not sure if this is required or if there's
# a better way to do this.
file.seek(0)
file.truncate()
df.to_pickle(file)
Вышеуказанное работает большую часть времени. Однако чем больше одновременных процессов у меня блокирует запись, тем больше я получаю ошибку EOFError на этапе pd.read_pickle(file)
.
EOFError: Ran out of input
Отслеживание очень длинное и запутанное.
В любом случае, мои мысли пока таковы, что, поскольку он иногда работает, приведенный выше код должен быть в порядке * (хотя он может быть грязным, и я не возражаю услышать о лучшем способе сделать то же самое).
Однако, когда у меня слишком много процессов, пытающихся заблокировать запись, я подозреваю, что у файла нет времени или чего-то еще, или, по крайней мере, каким-то образом следующие процессы еще не видят содержимое, которое было сохранено предыдущий процесс.
Был бы способ обойти это? Я попытался добавить в time.sleep(0.5)
операторы вокруг моего кода (до read_pickle
, после to_pickle
), и я не думаю, это помогло. Кто-нибудь понимает, что может происходить, или знает лучший способ сделать это?
Также обратите внимание, я не думаю, что время блокировки записи истекло. Я попытался синхронизировать процесс, и я также добавил туда флаг, чтобы отметить, если время блокировки записи истекло. В то время как есть 300 процессов, и они могут пытаться писать с разной скоростью, в целом я бы оценил, что примерно 2,5 записи в секунду, что не похоже на перегрузку системы? *
* Размер маринованного DataFrame составляет несколько сотен КБ.