Вы не можете сделать это любым полезным способом.Даже если вы решите использовать transform
stream.transform { rdd => {
val n = rdd.count
rdd.repartition(getNumParttitions(n))
}}
, это превосходит все цели операции, поскольку вам придется сканировать все данные до того, как перераспределение и первоначальное распределение останутся узким местом.
Вместо этого я бы рекомендовал правильную конфигурацию, основанную либо на spark.streaming.kafka.maxRatePerPartition
(старый API), либо на настройке противодавления (spark.streaming.backpressure.enabled
, spark.streaming.backpressure.initialRate
, более новый API)