Хотя вопрос не совсем легок для понимания, следующее совпадает с другим ответом, и этот подход должен избежать проблем, упомянутых при ненужной перетасовке:
val n = [... some calculation for number of partitions / executors based on cluster config and volume of data to process ...]
df.repartition(n, $"field_1", $"field_2", ...)
.sortWithinPartitions("fieldx", "field_y")
.write.partitionBy("field_1", "field_2", ...)
.format("location")
whereby [field_1, field_2, ...] - это один и тот же набор полей для repartition и partitionBy.