Несбалансированная задача распределения Spark (не соответствует количеству повторных разделов) - PullRequest
0 голосов
/ 04 августа 2020

Я хочу уменьшить использование памяти для каждой задачи. Поэтому я использую повторное разбиение, чтобы уменьшить размер задания.

Подготовка данных

group_cols = ['product_id', 'city_zip']
partition_number=8000

df = read_features(spark, start_time, end_time, input_path=input_path, group_cols=group_cols)

df = df.agg(
    F.collect_list(
        F.struct(*data_cols)
    ).alias('data')
)

if group_min_size:
    group_min_size = int(group_min_size)
    df = df.filter(F.size('data') >= group_min_size)


# ---! repartition here !

df = df.repartition(partition_number, *group_cols) 
df = df.groupBy(*group_cols)



sp = spark.read.parquet(TB.STORE_PRODUCT)
sp.cache()
total = sp.dropDuplicates(['product_code', 'city_zip']).count()

# --- total groups !

print(total)
# 22536

df = df.join(
    store_product,
    on=[*group_cols],
    how='inner'
)

Уменьшение карты

pipe = AutoARIMAPipe(....)  # simple wrap of pmdarima

# train a model for each group
df1 = (df.rdd
      .map(lambda r: r.asDict())
      .map(lambda d: pipe.transform_data(d))
      .map(lambda d: pipe.create_model(d))
      .map(lambda d: pipe.fit(d))
      .filter(lambda d: d.get('model') is not None )  # 
      .map(lambda d: pipe.pickle_model(d))
)

Что я ожидаю

  1. df.rdd.map разделит общее количество (общее количество групп 22536) на 8000 частей.
  2. Таким образом, только от 2 до 3 групп (22536/8000 = 2,817) для каждой задачи. Spark должен равномерно распределить группу по номеру переразбивки

Но это не так.

В моей задаче много ошибок

Есть много ошибок памяти:

ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 8.0 GB of 8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

enter image description here

This is the excact reason I set partition_number=8000 in order to reduce memory usage on each node .

Then I check a task's log, you can see there are much more groups than 3 were distributed to this task . Why ??

введите описание изображения здесь

Как мне достичь своей цели по сокращению использования памяти каждой задачей, чтобы предотвратить OOM на узлах исполнителя искры?

...