Flink Custom Partition Функция - PullRequest
0 голосов
/ 15 января 2019

Я использую Scala на Flink с DataSet API. Я хочу перераспределить мои данные по узлам. В Spark есть функция, которая позволяет пользователю переразбивать данные с заданным параметром numberOfPartitions ( link ), и я считаю, что Flink не поддерживает такую ​​функцию. Таким образом, я хотел достичь этого путем реализации пользовательской функции разбиения.

Мои данные имеют тип DataSet (Double, SparseVector) Пример строки из данных:

(1.0 SparseVector((2024,1.0), (2025,1.0), (2030,1.0), (2045,1.0), (2046,1.41), (2063,1.0), (2072,1.0), (3031,1.0), (3032,1.0), (4757,1.0), (4790,1.0), (177196,1.0), (177197,0.301), (177199,1.0), (177202,1.0), (1544177,1.0), (1544178,1.0), (1544179,1.0), (1654031,1.0), (1654190,1.0), (1654191,1.0), (1654192,1.0), (1654193,1.0), (1654194,1.0), (1654212,1.0), (1654237,1.0), (1654238,1.0)))

Так как мой "Double" является двоичным (1 или -1), я хочу разделить мои данные в соответствии с длиной SparceVector. Мой пользовательский разделитель выглядит следующим образом:

class myPartitioner extends Partitioner[SparseVector]
{ 
    override def partition(key: SparseVector, numPartitions: Int): Int = {
         key.size % numPartitions
    } 
}

Я называю этот пользовательский разделитель следующим образом:

data.partitionCustom(new myPartitioner(),1)

Может кто-нибудь помочь мне понять, как указать количество разделов в качестве аргумента "numPartitions" при вызове функции myPartitioner в Scala.

Спасибо.

Ответы [ 3 ]

0 голосов
/ 15 января 2019

В режиме flink вы можете определить setParallelism для одного оператора или для всех операторов, используя enviornment.setParallelism. Я надеюсь, что ссылка поможет вам.

0 голосов
/ 16 января 2019

Я предполагаю, что вы используете длину SparseVector просто для того, чтобы иметь что-то, что дает вам относительно случайные значения для использования при разбиении. Если это правда, тогда вы можете просто сделать DataSet.rebalance(). Если вы будете следовать этому любому оператору (включая Sink), где вы устанавливаете параллелизм на numPartitions, то вы должны получить красиво перераспределенные данные.

Но ваше описание ...want to re-partition my data across the nodes заставляет меня думать, что вы пытаетесь применить концепцию Spark RDD s к Flink, что не совсем верно. Например. при условии, что у вас есть numPartition параллельные операторы, обрабатывающие (повторно разделенные) данные в вашем DataSet, тогда эти операторы будут работать в слотах, предоставляемых доступными TaskManager, и эти слоты могут быть или не быть на разных физических серверах.

0 голосов
/ 15 января 2019

Spark использует функцию перераспределения (n: Int) для перераспределения данных на n разделов, которые будут обрабатываться n задачами. С моей точки зрения, это включает в себя два изменения: перераспределение данных и количество последующих задач.

Поэтому в Apache Flink я думаю, что Partitioner сопоставлен с перераспределением данных, а параллелизм сопоставлен с числом последующих задач, что означает, что вы можете использовать setParallelism для определения «numPartitions».

...