KMeansModel.clusterCenters возвращает NULL - PullRequest
0 голосов
/ 25 сентября 2018

Я использую клей AWS для выполнения кластеризации Kmeans в моем наборе данных.Я хочу найти не только метки кластеров, но и центры кластеров.Я не могу найти позже.

В приведенном ниже коде model.clusterCenters возвращает значение NULL.Кластеризация KMeans работает нормально и возвращает метку кластера, т. Е. Переменную clusterInstance .

import java.time.LocalDate
import java.time.format.DateTimeFormatter

import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object Clustering {
  case class ObjectDay(realnumber: Double, bnumber : Double, blockednumber: Double,
                    creationdate : String, fname : String, uniqueid : Long, registrationdate : String,
                    plusnumber : Double, cvalue : Double, hvalue : Double)
  case class ClusterInfo( instance: Int, centers: String)

  def main(args: Array[String]): Unit = {
    val sc: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(sc)
    val spark: SparkSession = glueContext.getSparkSession
    import spark.implicits._

    // write your code here - start
    // Data Catalog: database and table name
    val dbName = "dbname"
    val tblName = "raw"
    val sqlText = "SELECT <columns removed> FROM viewname WHERE `creation_date` ="

    // S3 location for output
    val outputDir = "s3://blucket/path/"

    // Read data into a DynamicFrame using the Data Catalog metadata
    val rawDyf: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblName).getDynamicFrame()

    // get only single day data with only numbers
    // Spark SQL on a Spark dataframe
    val numberDf = rawDyf.toDF()
    numberDf.createOrReplaceTempView("viewname")

    def getDataViaSql(runDate : LocalDate): RDD[ObjectDay] ={
      val data = spark.sql(s"${sqlText} '${runDate.toString}'")
      data.as[ObjectDay].rdd
    }

    def getDenseVector(rddnumbers: RDD[ObjectDay]): RDD[linalg.Vector]={
      rddnumbers.map(s => Vectors.dense(Array(s.realnumber, s.bnumber, s.blockednumber))).cache()
    }

    def getClusters( numbers: RDD[linalg.Vector] ): RDD[ClusterInfo]  = {
      // Trains a k-means model
      val model: KMeansModel = KMeans.train(numbers, 2, 20)
      val centers: Array[linalg.Vector] = model.clusterCenters

      //put together unique_ids with cluster predictions
      val clusters: RDD[Int] = model.predict(numbers)

      clusters.map{ clusterInstance =>
          ClusterInfo(clusterInstance.toInt, centers(clusterInstance).toJson)
      }
    }
    def combineDataAndClusterInstances(rddnumbers : RDD[ObjectDay], clusterCenters: RDD[ClusterInfo]): DataFrame ={
      val numbersWithCluster = rddnumbers.zip(clusterCenters)
        numbersWithCluster.map(
          x =>
            (x._1.realnumber, x._1.bnumber, x._1.blockednumber, x._1.creationdate, x._1.fname,
            x._1.uniqueid, x._1.registrationdate, x._1.plusnumber, x._1.cvalue, x._1.hvalue,
              x._2.instance, x._2.centers)
        )
        .toDF("realnumber", "bnumber", "blockednumber", "creationdate",
        "fname","uniqueid", "registrationdate", "plusnumber", "cvalue", "hvalue",
        "clusterInstance", "clusterCenter")
    }
    def process(runDate : LocalDate): DataFrame = {
      val rddnumbers = getDataViaSql( runDate)
      val dense = getDenseVector(rddnumbers)
      val clusterCenters = getClusters(dense)
      combineDataAndClusterInstances(rddnumbers, clusterCenters)
    }

    val startdt = LocalDate.parse("2018-01-01", DateTimeFormatter.ofPattern("yyyy-MM-dd"))

    val dfByDates = (0 to 240)
      .map(days => startdt.plusDays(days))
      .map(process(_))

    val result = dfByDates.tail.fold(dfByDates.head)((accDF, newDF) => accDF.union(newDF))

    val output = DynamicFrame(result, glueContext).withName(name="prediction")

    // write your code here - end
    glueContext.getSinkWithFormat(connectionType = "s3",
      options = JsonOptions(Map("path" -> outputDir)), format = "csv").writeDynamicFrame(output)
  }
}

Я могу успешно найти центры кластеров, используя библиотеку Python sklearn для тех же данных.

ОБНОВЛЕНО: отображается полный код Scala, выполняемый как задание Glue.Также я не получаю никаких ошибок при выполнении задания.У меня просто нет кластерных центров.

Чего мне не хватает?

1 Ответ

0 голосов
/ 26 сентября 2018

Nevermind.Он генерирует центры кластеров.

Я до сих пор не видел выходные файлы S3.

Я запускал Glue Crawler и смотрел результаты в AWS Athena.Искатель создал тип данных столбца struct или array для столбца clustercenter, и Athena не удалось проанализировать и прочитать JSON, сохраненный в виде строки в выходных данных CSV.

Извините, что беспокою.

...