Я использую клей AWS для выполнения кластеризации Kmeans в моем наборе данных.Я хочу найти не только метки кластеров, но и центры кластеров.Я не могу найти позже.
В приведенном ниже коде model.clusterCenters возвращает значение NULL.Кластеризация KMeans работает нормально и возвращает метку кластера, т. Е. Переменную clusterInstance .
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object Clustering {
case class ObjectDay(realnumber: Double, bnumber : Double, blockednumber: Double,
creationdate : String, fname : String, uniqueid : Long, registrationdate : String,
plusnumber : Double, cvalue : Double, hvalue : Double)
case class ClusterInfo( instance: Int, centers: String)
def main(args: Array[String]): Unit = {
val sc: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(sc)
val spark: SparkSession = glueContext.getSparkSession
import spark.implicits._
// write your code here - start
// Data Catalog: database and table name
val dbName = "dbname"
val tblName = "raw"
val sqlText = "SELECT <columns removed> FROM viewname WHERE `creation_date` ="
// S3 location for output
val outputDir = "s3://blucket/path/"
// Read data into a DynamicFrame using the Data Catalog metadata
val rawDyf: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblName).getDynamicFrame()
// get only single day data with only numbers
// Spark SQL on a Spark dataframe
val numberDf = rawDyf.toDF()
numberDf.createOrReplaceTempView("viewname")
def getDataViaSql(runDate : LocalDate): RDD[ObjectDay] ={
val data = spark.sql(s"${sqlText} '${runDate.toString}'")
data.as[ObjectDay].rdd
}
def getDenseVector(rddnumbers: RDD[ObjectDay]): RDD[linalg.Vector]={
rddnumbers.map(s => Vectors.dense(Array(s.realnumber, s.bnumber, s.blockednumber))).cache()
}
def getClusters( numbers: RDD[linalg.Vector] ): RDD[ClusterInfo] = {
// Trains a k-means model
val model: KMeansModel = KMeans.train(numbers, 2, 20)
val centers: Array[linalg.Vector] = model.clusterCenters
//put together unique_ids with cluster predictions
val clusters: RDD[Int] = model.predict(numbers)
clusters.map{ clusterInstance =>
ClusterInfo(clusterInstance.toInt, centers(clusterInstance).toJson)
}
}
def combineDataAndClusterInstances(rddnumbers : RDD[ObjectDay], clusterCenters: RDD[ClusterInfo]): DataFrame ={
val numbersWithCluster = rddnumbers.zip(clusterCenters)
numbersWithCluster.map(
x =>
(x._1.realnumber, x._1.bnumber, x._1.blockednumber, x._1.creationdate, x._1.fname,
x._1.uniqueid, x._1.registrationdate, x._1.plusnumber, x._1.cvalue, x._1.hvalue,
x._2.instance, x._2.centers)
)
.toDF("realnumber", "bnumber", "blockednumber", "creationdate",
"fname","uniqueid", "registrationdate", "plusnumber", "cvalue", "hvalue",
"clusterInstance", "clusterCenter")
}
def process(runDate : LocalDate): DataFrame = {
val rddnumbers = getDataViaSql( runDate)
val dense = getDenseVector(rddnumbers)
val clusterCenters = getClusters(dense)
combineDataAndClusterInstances(rddnumbers, clusterCenters)
}
val startdt = LocalDate.parse("2018-01-01", DateTimeFormatter.ofPattern("yyyy-MM-dd"))
val dfByDates = (0 to 240)
.map(days => startdt.plusDays(days))
.map(process(_))
val result = dfByDates.tail.fold(dfByDates.head)((accDF, newDF) => accDF.union(newDF))
val output = DynamicFrame(result, glueContext).withName(name="prediction")
// write your code here - end
glueContext.getSinkWithFormat(connectionType = "s3",
options = JsonOptions(Map("path" -> outputDir)), format = "csv").writeDynamicFrame(output)
}
}
Я могу успешно найти центры кластеров, используя библиотеку Python sklearn для тех же данных.
ОБНОВЛЕНО: отображается полный код Scala, выполняемый как задание Glue.Также я не получаю никаких ошибок при выполнении задания.У меня просто нет кластерных центров.
Чего мне не хватает?