Перенос нескольких наборов данных на следующую итерацию в Apache Flink - PullRequest
0 голосов
/ 06 марта 2020

В исходном примере кластеризации Kmeans, предоставленном с Flink, каждая точка назначается новому центроиду в каждой итерации, и информация о том, какому центроиду была назначена точка, не переносится на следующую итерацию. Моя цель - перенести эту информацию на следующую итерацию.

Первым решением, которое я попытался, было присвоить каждую точку несуществующему центроиду с идентификатором 0 до l oop, а затем обновить его. DataSet через итерации. Так я бы поступил в обычном l oop, однако теперь я понимаю, что использование функции итерации во Flink не совсем то же самое, что использование обычного l oop. Код для этого показан ниже.

DataSet<Tuple2<Integer, Point>> clusteredPoints = nullClusteredPoints;

IterativeDataSet<Centroid> loop = centroids.iterate(iterations);

// Asssigning each point to the nearest centroid
clusteredPoints = clusteredPoints
        // compute closest centroid for each point
        .map(new SelectNearestCenter())
        .withBroadcastSet(loop, "centroids");

DataSet<Centroid> newCentroids = clusteredPoints
        // count and sum point coordinates for each centroid
        .map(new CountAppender())
        .groupBy(0).reduce(new CentroidAccumulator())
        // compute new centroids from point counts and coordinate sums
        .map(new CentroidAverager());

// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);

Я ожидал, что DataSet clusteredPoints будет использоваться на каждой итерации, а затем после последней итерации этот DataSet будет состоять из конечных кластеризованных точек. Однако при попытке выполнить это возникает следующее исключение:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

Другое решение, которое я пробовал, состояло в том, чтобы использовать дельта-итерации и помещать DataSet точек в набор решений для подачи на следующую итерацию. Это также не сработало, поскольку единственными операциями, которые разрешены в наборе решений, являются Join и CoGroup, в соответствии с приведенным ниже исключением.

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Error: The only operations allowed on the solution set are Join and CoGroup.

Третье решение, которое я пытался, было прочитать набор данных точек из диск в начале каждой итерации и записать их на диск в конце итерации (вероятно, это будет крайне неэффективно). Однако запись на диск возвращает DataSink, и, следовательно, первое исключение, отображаемое выше, возникает и для этого решения.

Являются ли лучшие решения, которые я мог бы попробовать? Или итерации Flink не поддерживают такой пример использования?

1 Ответ

0 голосов
/ 09 марта 2020

Итерация Flink в настоящее время поддерживает только один движущийся набор данных, так что он получает хорошие свойства времени выполнения, в которых все данные c хранятся в памяти, а движущийся набор данных передается в потоковом режиме. Теоретически, Flink может поддерживать больше, но во многих случаях эти приятные свойства не могут сохраняться.

В вашем случае вы можете решить проблему, соединив два набора данных в один centroidWithPoints = clusters где для каждого центроида вы также храните список точек.

Кроме того, вы можете использовать теговое объединение, чтобы объединить оба набора данных в один, а затем разделить его в начале следующей итерации.

...