Я пытаюсь выполнить базовый ETL рабочий процесс для больших файлов среди работников, используя dask-cudf
для большого количества работников.
Проблема:
Первоначальноscheduler
планирует равное количество partitions
для чтения среди рабочих, но во время предварительной обработки имеет тенденцию распределять / перемешивать их среди рабочих.
Минимальное число разделов, которые получает работник, составляет 4
, а максимальное число получаемых им разделов 19
(total partitions
= apprx. 300
, num_workers
= 22
), такое поведение вызываетпроблема вниз по течению, так как я хочу равного распределения разделов между работниками.
Есть ли способ предотвратить такое поведение?
Я думал, что ниже поможет с этим, но это не так.
# limit work-stealing as much as possible
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})
Рабочий процесс выполняется:
- чтение
- fill-na
- логика приведения / другая логика
df = dask_cudf.read_csv(path = `big_files`,
names = names,
delimiter='\t',
dtype = read_dtype_ls,
chunksize=chunksize)
df = df.map_partitions(lambda df:df.fillna(-1))
def transform_col_int64_to_int32(df, columns):
"""
This function casts int64s columns to int32s
we are using this to transform int64s to int32s and overflows seem to be consitent
"""
for col in columns:
df[col] = df[col].astype(np.int32)
return df
df = df.map_partitions(transform_col_int64_to_int32,cat_col_names)
df = df.persist()