Я хочу coalesce
разделов в приложении Spark Streaming, чтобы у меня оставался один раздел на исполнителя (для определенного вспомогательного этапа).Это должно происходить динамически, поскольку приложение является долгоживущим, и исполнители могут выйти из строя и быть восстановлены в течение этого времени.
Хорошо, если операция coalesce
использует недопустимое количество исполнителей для одной или двух микробатчей, когда число живых исполнителей колеблется.
Насколько я могу судить, числоИсполнители могут быть получены только динамически из SparkContext
, который не сериализуем и поэтому не может использоваться внутри DStream
foreachRDD
.Я хочу избежать перестановки данных, поэтому использование repartition
API DStream не вариант.
т.е. это не сработает:
metrics.foreachRDD { (rdd, time) => rdd
.coalesce(ssc.sparkContext.getExecutorMemoryStatus.size - 1)
.mapPartitions(it => { ...
PS: Если кому-то интересно, почему- вспомогательный этап связан с агрегацией метрик для конкретного приложения, и я хочу избежать увеличения моей кардинальности метрической метки, которая уже достаточно высока, еще на несколько тысяч (количество разделов в кластере).Нет необходимости в перетасовке, так как я хочу видеть метрики о том, что на самом деле обрабатывал каждый исполнитель (используя тег host
), в противном случае я мог бы просто сделать reduceByKey
или countByValue
для DStream
.