Сумка Dask застревает при обработке, когда задан размер блока, использует только одного работника, когда он не определен - PullRequest
0 голосов
/ 30 марта 2020

Я пытаюсь обработать один большой (1 ТБ) json файл локально с помощью Dask. Файл имеет один объект на строку. Когда я не указываю размер блока в функции read_text, код работает отлично, но только на одном работнике. Затем создается только один раздел, и только одна задача видна на приборной панели. Если я укажу blocksize, все работники получат задания, но они никогда не уйдут от обработки (по крайней мере, в течение 12 часов). Что не так? Как заставить всех работников фактически выполнять работу?

Код выглядит следующим образом:

import dask.bag as db
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, 
                threads_per_worker=2,
                memory_limit='2GB')

client = Client(cluster)

db.read_text('./data/uncompressed/latest-all.json', blocksize=1e8)\
    .map(lambda obj: obj[:-2])\
    .map(parse_json)\
    .map(prune)\
    .filter(lambda obj: obj != None)\
    .map(json.dumps)\
    .to_textfiles('./data/proc/*.json')

* parse_json и prune - это чистые python функции без ввода-вывода .

Вот фрагмент информационной панели, когда определен размер блока: dask dashboard

1 Ответ

1 голос
/ 04 апреля 2020

Мое первое предположение - они работают, но ваши функции просто очень медленные. Вы можете увидеть, что ваши работники делают, либо ...

  1. Глядя на страницу профиля, чтобы увидеть, на что рабочие тратят время на
  2. Просматривая информационную страницу, переходя к любому рабочий, а затем нажмите кнопку «Стеки вызовов», чтобы увидеть, над чем они работают

Вы также можете рассмотреть меньшие размеры чанков, чтобы посмотреть, помогает ли это ускорить процесс.

...