В исходном примере кластеризации Kmeans, предоставленном с Flink, каждая точка назначается новому центроиду в каждой итерации, и информация о том, какому центроиду была назначена точка, не переносится на следующую итерацию. Моя цель - перенести эту информацию на следующую итерацию.
Первым решением, которое я попытался, было присвоить каждую точку несуществующему центроиду с идентификатором 0 до l oop, а затем обновить его. DataSet через итерации. Так я бы поступил в обычном l oop, однако теперь я понимаю, что использование функции итерации во Flink не совсем то же самое, что использование обычного l oop. Код для этого показан ниже.
DataSet<Tuple2<Integer, Point>> clusteredPoints = nullClusteredPoints;
IterativeDataSet<Centroid> loop = centroids.iterate(iterations);
// Asssigning each point to the nearest centroid
clusteredPoints = clusteredPoints
// compute closest centroid for each point
.map(new SelectNearestCenter())
.withBroadcastSet(loop, "centroids");
DataSet<Centroid> newCentroids = clusteredPoints
// count and sum point coordinates for each centroid
.map(new CountAppender())
.groupBy(0).reduce(new CentroidAccumulator())
// compute new centroids from point counts and coordinate sums
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
Я ожидал, что DataSet clusteredPoints
будет использоваться на каждой итерации, а затем после последней итерации этот DataSet будет состоять из конечных кластеризованных точек. Однако при попытке выполнить это возникает следующее исключение:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?
Другое решение, которое я пробовал, состояло в том, чтобы использовать дельта-итерации и помещать DataSet точек в набор решений для подачи на следующую итерацию. Это также не сработало, поскольку единственными операциями, которые разрешены в наборе решений, являются Join и CoGroup, в соответствии с приведенным ниже исключением.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Error: The only operations allowed on the solution set are Join and CoGroup.
Третье решение, которое я пытался, было прочитать набор данных точек из диск в начале каждой итерации и записать их на диск в конце итерации (вероятно, это будет крайне неэффективно). Однако запись на диск возвращает DataSink, и, следовательно, первое исключение, отображаемое выше, возникает и для этого решения.
Являются ли лучшие решения, которые я мог бы попробовать? Или итерации Flink не поддерживают такой пример использования?