Как обеспечить, чтобы количество «разделов» было одинаково распределено между работниками с dask и dask-cudf? - PullRequest
1 голос
/ 04 октября 2019

Я пытаюсь выполнить базовый ETL рабочий процесс для больших файлов среди работников, используя dask-cudf для большого количества работников.

Проблема:

Первоначальноscheduler планирует равное количество partitions для чтения среди рабочих, но во время предварительной обработки имеет тенденцию распределять / перемешивать их среди рабочих.

Минимальное число разделов, которые получает работник, составляет 4, а максимальное число получаемых им разделов 19 (total partitions = apprx. 300, num_workers = 22), такое поведение вызываетпроблема вниз по течению, так как я хочу равного распределения разделов между работниками.

Есть ли способ предотвратить такое поведение?

Я думал, что ниже поможет с этим, но это не так.

# limit work-stealing as much as possible
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})

Рабочий процесс выполняется:

  • чтение
  • fill-na
  • логика приведения / другая логика

df = dask_cudf.read_csv(path = `big_files`,
                        names = names,
                        delimiter='\t',
                        dtype = read_dtype_ls,
                        chunksize=chunksize)


df = df.map_partitions(lambda df:df.fillna(-1))

def transform_col_int64_to_int32(df, columns):
    """
        This function casts int64s columns to int32s 
        we are using this to transform int64s to int32s and overflows seem to be consitent
    """
    for col in columns:
        df[col] = df[col].astype(np.int32)
    return df

df = df.map_partitions(transform_col_int64_to_int32,cat_col_names)
df = df.persist()

1 Ответ

1 голос
/ 04 октября 2019

Графики Dask, в которых задачи выполняются, основаны на ряде факторов, включая зависимости данных, время выполнения, использование памяти и т. Д. Обычно ответом на эти вопросы является «просто дайте этому сделать свое дело». Самая распространенная причина плохого планирования - слишком мало чанков.

Однако, если вам явно требуется более сбалансированный дистрибутив, вы можете попробовать метод Client.rebalance .

wait(df)
client.rebalance(df)

Однако следует помнить, что перебалансировка не так надежна, как другие операции Dask. Лучше всего делать это в то время, когда не выполняется тонна другой работы (отсюда и звонок на dask.distributed.wait выше).

Кроме того, я бы включил воровство работы. Воровство работы - другое название для распределения нагрузки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...