Я использую Scala на Flink с DataSet API.
Я хочу перераспределить мои данные по узлам. В Spark есть функция, которая позволяет пользователю переразбивать данные с заданным параметром numberOfPartitions ( link ), и я считаю, что Flink не поддерживает такую функцию.
Таким образом, я хотел достичь этого путем реализации пользовательской функции разбиения.
Мои данные имеют тип DataSet (Double, SparseVector)
Пример строки из данных:
(1.0 SparseVector((2024,1.0), (2025,1.0), (2030,1.0), (2045,1.0), (2046,1.41), (2063,1.0), (2072,1.0), (3031,1.0), (3032,1.0), (4757,1.0), (4790,1.0), (177196,1.0), (177197,0.301), (177199,1.0), (177202,1.0), (1544177,1.0), (1544178,1.0), (1544179,1.0), (1654031,1.0), (1654190,1.0), (1654191,1.0), (1654192,1.0), (1654193,1.0), (1654194,1.0), (1654212,1.0), (1654237,1.0), (1654238,1.0)))
Так как мой "Double" является двоичным (1 или -1), я хочу разделить мои данные в соответствии с длиной SparceVector.
Мой пользовательский разделитель выглядит следующим образом:
class myPartitioner extends Partitioner[SparseVector]
{
override def partition(key: SparseVector, numPartitions: Int): Int = {
key.size % numPartitions
}
}
Я называю этот пользовательский разделитель следующим образом:
data.partitionCustom(new myPartitioner(),1)
Может кто-нибудь помочь мне понять, как указать количество разделов в качестве аргумента "numPartitions" при вызове функции myPartitioner в Scala.
Спасибо.