Я хочу уменьшить использование памяти для каждой задачи. Поэтому я использую повторное разбиение, чтобы уменьшить размер задания.
Подготовка данных
group_cols = ['product_id', 'city_zip']
partition_number=8000
df = read_features(spark, start_time, end_time, input_path=input_path, group_cols=group_cols)
df = df.agg(
F.collect_list(
F.struct(*data_cols)
).alias('data')
)
if group_min_size:
group_min_size = int(group_min_size)
df = df.filter(F.size('data') >= group_min_size)
# ---! repartition here !
df = df.repartition(partition_number, *group_cols)
df = df.groupBy(*group_cols)
sp = spark.read.parquet(TB.STORE_PRODUCT)
sp.cache()
total = sp.dropDuplicates(['product_code', 'city_zip']).count()
# --- total groups !
print(total)
# 22536
df = df.join(
store_product,
on=[*group_cols],
how='inner'
)
Уменьшение карты
pipe = AutoARIMAPipe(....) # simple wrap of pmdarima
# train a model for each group
df1 = (df.rdd
.map(lambda r: r.asDict())
.map(lambda d: pipe.transform_data(d))
.map(lambda d: pipe.create_model(d))
.map(lambda d: pipe.fit(d))
.filter(lambda d: d.get('model') is not None ) #
.map(lambda d: pipe.pickle_model(d))
)
Что я ожидаю
df.rdd.map
разделит общее количество (общее количество групп 22536) на 8000 частей. - Таким образом, только от 2 до 3 групп (22536/8000 = 2,817) для каждой задачи. Spark должен равномерно распределить группу по номеру переразбивки
Но это не так.
В моей задаче много ошибок
Есть много ошибок памяти:
ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 8.0 GB of 8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
This is the excact reason I set partition_number=8000
in order to reduce memory usage on each node .
Then I check a task's log, you can see there are much more groups than 3 were distributed to this task . Why ??
введите описание изображения здесь
Как мне достичь своей цели по сокращению использования памяти каждой задачей, чтобы предотвратить OOM на узлах исполнителя искры?