У меня есть фрейм данных df
, который включает в себя два столбца:
- GROUP_ID - их всего 3: 1, 2, 3
- ELEMENT_ID - около 200Mиз них
Каждая группа имеет несколько элементов.
Есть и другие столбцы, например SCORE.
Мне нужно:
- Сортировка элементов по SCORE в пределах GROUP и вычисление совокупной суммы каждого элемента.
Назначьте Bucket каждому SCORE в пределах GROUP, используя get_bucket_udf
UDF.
Создать структуру {'GROUP_ID', 'BUCKET', 'SCORE'}
(SCORE_SET) для каждого элемента и объединить эти структуры в список для каждого элемента.
До шага 1 I разбить на GROUP_ID и, следовательно, шаги 1-2 выполняются только для 3 исполнителей, поскольку у нас есть только 3 ГРУППЫ.
Сразу после шага 2 я пытаюсь перераспределить по ELEMENT_ID, чтобы воспользоваться преимуществами 100 исполнителей, но, глядя на Hadoop Resource Manager, я вижу, чтоон все еще использует только 3 исполнителя.
Можно ли использовать всех исполнителей на шаге 3?Код ниже.Спасибо!
#Step 1
group_id_partition = Window.partitionBy('GROUP_ID')
group_id_ordered_by_score = df_partition.orderBy('SCORE')
group_id_score_ranked = df.select(
'*',
func.row_number().over(group_id_ordered_by_score).alias('SCORE_RANK'))
group_id_ordered_by_rank = group_id_partition.orderBy('SCORE_RANK')
# Compute cumulative sum for each row within group_id
df_enhanced = group_id_score_ranked.select(
'ELEMENT_ID',
func.sum('SCORE').over(group_id_partition).alias('SCORE_SUM'),
func.sum('SCORE').over(
group_id_ordered_by_rank.rangeBetween(Window.unboundedPreceding, Window.currentRow)
).alias('SCORE_CUMSUM')
).orderBy('ID', 'SCORE')
# Step 2
df_enhanced_bucketed = df_enhanced.select(
'*',
get_bucket_udf('SCORE_SUM', 'SCORE_CUMSUM').alias('BUCKET'))
# Repartition by ELEMENT_ID - does not redistribute over all executors
df_enhanced_bucketed_repartitioned = df_enhanced_bucketed.repartition('ELEMENT_ID')
# Step 3
output_df_structed = df_enhanced_bucketed_repartitioned.select(
'ELEMENT_ID',
func.struct('GROUP_ID', 'BUCKET', 'SCORE').alias('SCORE_SET'))
output_df = output_df_structed.groupBy('ELEMENT_ID').agg(
func.collect_list('SCORE_SET').alias('SCORE_SETS'))
result = output_df.select('ELEMENT_ID', 'SCORE_SETS')
result.take(1)