Spark не может перераспределиться после Window.partitionBy - PullRequest
0 голосов
/ 28 сентября 2019

У меня есть фрейм данных df, который включает в себя два столбца:

  • GROUP_ID - их всего 3: 1, 2, 3
  • ELEMENT_ID - около 200Mиз них

Каждая группа имеет несколько элементов.

Есть и другие столбцы, например SCORE.

Мне нужно:

  1. Сортировка элементов по SCORE в пределах GROUP и вычисление совокупной суммы каждого элемента.
  2. Назначьте Bucket каждому SCORE в пределах GROUP, используя get_bucket_udf UDF.

  3. Создать структуру {'GROUP_ID', 'BUCKET', 'SCORE'} (SCORE_SET) для каждого элемента и объединить эти структуры в список для каждого элемента.

До шага 1 I разбить на GROUP_ID и, следовательно, шаги 1-2 выполняются только для 3 исполнителей, поскольку у нас есть только 3 ГРУППЫ.

Сразу после шага 2 я пытаюсь перераспределить по ELEMENT_ID, чтобы воспользоваться преимуществами 100 исполнителей, но, глядя на Hadoop Resource Manager, я вижу, чтоон все еще использует только 3 исполнителя.

Можно ли использовать всех исполнителей на шаге 3?Код ниже.Спасибо!

#Step 1
group_id_partition = Window.partitionBy('GROUP_ID')
group_id_ordered_by_score = df_partition.orderBy('SCORE')

group_id_score_ranked = df.select(
    '*',
    func.row_number().over(group_id_ordered_by_score).alias('SCORE_RANK'))

group_id_ordered_by_rank = group_id_partition.orderBy('SCORE_RANK')

# Compute cumulative sum for each row within group_id
df_enhanced = group_id_score_ranked.select(
        'ELEMENT_ID',
        func.sum('SCORE').over(group_id_partition).alias('SCORE_SUM'),
        func.sum('SCORE').over(
            group_id_ordered_by_rank.rangeBetween(Window.unboundedPreceding, Window.currentRow)
        ).alias('SCORE_CUMSUM')
    ).orderBy('ID', 'SCORE')

# Step 2
df_enhanced_bucketed = df_enhanced.select(
    '*',
    get_bucket_udf('SCORE_SUM', 'SCORE_CUMSUM').alias('BUCKET'))

# Repartition by ELEMENT_ID - does not redistribute over all executors
df_enhanced_bucketed_repartitioned = df_enhanced_bucketed.repartition('ELEMENT_ID')

# Step 3
output_df_structed = df_enhanced_bucketed_repartitioned.select(
    'ELEMENT_ID',
    func.struct('GROUP_ID', 'BUCKET', 'SCORE').alias('SCORE_SET'))

output_df = output_df_structed.groupBy('ELEMENT_ID').agg(
    func.collect_list('SCORE_SET').alias('SCORE_SETS'))

result =  output_df.select('ELEMENT_ID', 'SCORE_SETS')
result.take(1)
...