Я пытаюсь реализовать алгоритм DBSCAN на Spark, поэтому я следую статье Параллельный алгоритм DBSCAN на основе Spark .
Они предлагают алгоритм с 4 основными шагами:
- Раздел данных
- Вычисление локального DBSCAN
- Объединение раздела данных
- Глобальная кластеризация
Итак, я реализую второй шаг с использованием GraphX, и псевдокод выглядит примерно так:
- Выберите произвольную точку
p
в текущем разделе
- Вычислить
N_{e}
и, если N_{e} >= minPts
, пометить p как точку ядра, в противном случае как точку шума.
- Если p является базовой точкой, то сделать кластер
c
на p
, добавив все точки, принадлежащие кластеру c
, в список для рекурсивных вызовов.
- ...
А вот и мой код (я в курсе, что он не работает):
def dataPartition() : Graph[Array[String], Int] = {
graph.partitionBy(PartitionStrategy.RandomVertexCut)
}
def computingLocalDBSCAN() : Unit = {
graph = dataPartition()
//val neighbors = graph.mapVertices((id, attr) => localDBSCANMap(id, attr))
}
def localDBSCANMap(id: VertexId, attr:Array[String], cluster:Int):Unit = {
val neighbors = graph.collectNeighbors(EdgeDirection.Out).lookup(id)
if (neighbors.size >= eps) {
attr(0) = PointType.Core.toString
attr(1) = cluster.toString
} else {
attr(0) = PointType.Noise.toString
}
neighbors.foreach(it => {
for (item <- it) {
localDBSCANMap(item._1, item._2, cluster)
}
})
}
У меня есть несколько вопросов:
- Как я могу изменить значение одного атрибута вершины? Я понимаю, что вершины неизменны, но я хотел бы отметить узлы с
Noise
, Core
, Border
или Unclassified
.
- Как я могу выбрать случайный узел внутри раздела? Потому что моя проблема с методом map заключается в том, что я должен изменить значения в то же время, когда я пересекаю график.
- Как я могу вызвать рекурсивный метод и изменить значения атрибута? в то же время?