Я пытаюсь обработать один большой (1 ТБ) json файл локально с помощью Dask. Файл имеет один объект на строку. Когда я не указываю размер блока в функции read_text
, код работает отлично, но только на одном работнике. Затем создается только один раздел, и только одна задача видна на приборной панели. Если я укажу blocksize
, все работники получат задания, но они никогда не уйдут от обработки (по крайней мере, в течение 12 часов). Что не так? Как заставить всех работников фактически выполнять работу?
Код выглядит следующим образом:
import dask.bag as db
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4,
threads_per_worker=2,
memory_limit='2GB')
client = Client(cluster)
db.read_text('./data/uncompressed/latest-all.json', blocksize=1e8)\
.map(lambda obj: obj[:-2])\
.map(parse_json)\
.map(prune)\
.filter(lambda obj: obj != None)\
.map(json.dumps)\
.to_textfiles('./data/proc/*.json')
* parse_json
и prune
- это чистые python функции без ввода-вывода .
Вот фрагмент информационной панели, когда определен размер блока: