Хеширование с учетом локальных особенностей в Spark для одного DataFrame - PullRequest
1 голос
/ 03 февраля 2020

Я прочитал раздел Spark о хешировании с учетом локальных особенностей и до сих пор не понимаю некоторые из них:

https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing

И есть случайная проекция с группировкой Пример для двух DataFrames. У меня есть один простой, пространственный набор данных точек, например:

enter image description here

(Конечно, позже у меня будут миллионы точек), и DataFrame выглядит следующим образом:

  X        Y
id                  
1   11.6133  48.1075
2   11.6142  48.1066
3   11.6108  48.1061
4   11.6207  48.1192
5   11.6221  48.1223
6   11.5969  48.1276
7   11.5995  48.1258
8   11.6127  48.1066
9   11.6430  48.1275
10  11.6368  48.1278
11  11.5930  48.1156

Мой вопрос таков: как расположить точки, близкие друг к другу, к одним и тем же группам, чтобы в моем исходном кадре данных был дополнительный столбец с этими хэшами / группами?

Best, Marcin

Ответы [ 2 ]

1 голос
/ 03 февраля 2020

Вот код scala, который выполняет LSH. По сути, l sh нужен собранный вектор, который вы можете построить с помощью VectorAssembler.

// contructing the dataframe
val data= """1   11.6133  48.1075
2   11.6142  48.1066
3   11.6108  48.1061
4   11.6207  48.1192
5   11.6221  48.1223
6   11.5969  48.1276
7   11.5995  48.1258
8   11.6127  48.1066
9   11.6430  48.1275
10  11.6368  48.1278
11  11.5930  48.1156"""
val df = data
    .split("\\s*\\n\\s*")
    .map( _.split("\\s+") match {
        case Array(a, b, c) => (a.toInt,b.toDouble,c.toDouble)
    })
    .toSeq
    .toDF("id", "X", "Y")

val assembler = new VectorAssembler()
    .setInputCols(Array("X", "Y"))
    .setOutputCol("v")
val df2 = assembler.transform(df)
val lsh = new BucketedRandomProjectionLSH()
    .setInputCol("v")
    .setBucketLength(1e-3) // change that according to your use case
    .setOutputCol("lsh")
val result = lsh.fit(df2).transform(df2).orderBy("lsh")

// the lsh is in an array of vectors. To extract the double, we can use
// getItem for the array and a UDF for the vector.
val extract = udf((vector : org.apache.spark.ml.linalg.Vector) => vector(0))
result.withColumn("lsh", extract(col("lsh").getItem(0))).show(false)
1 голос
/ 03 февраля 2020

BucketedRandomProjectionLSH делает именно то, что вам нужно. Результатом ha sh для каждой точки может быть групповое значение. Единственная проблема заключается в выборе правильного радиуса, который будет устанавливать размер каждого сегмента. Используйте .setBucketLength(0.02), чтобы установить радиус. Другая небольшая проблема - извлечь ha sh из вектора в столбец. Я использую этот метод: Spark Scala: Как преобразовать Dataframe [vector] в DataFrame [f1: Double, ..., fn: Double)]

Пример с вашими данными

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector

val dfA = spark.createDataFrame(Seq(
  (1, Vectors.dense(11.6133, 48.1075)),
  (2, Vectors.dense(11.6142, 48.1066)),
  (3, Vectors.dense(11.6108, 48.1061)),
  (4, Vectors.dense(11.6207, 48.1192)),
  (5, Vectors.dense(11.6221, 48.1223)),
  (6, Vectors.dense(11.5969, 48.1276)),
  (7, Vectors.dense(11.5995, 48.1258)),
  (8, Vectors.dense(11.6127, 48.1066)),
  (9, Vectors.dense(11.6430, 48.1275)),
  (10, Vectors.dense(11.6368, 48.1278)),
  (11, Vectors.dense(11.5930, 48.1156))
  )).toDF("id", "coord")

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(0.02)
  .setNumHashTables(1)
  .setInputCol("coord")
  .setOutputCol("hashes")
val model = brp.fit(dfA)

val res = model.transform(dfA)

val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic

res.select ($"id", vecToSeq($"hashes"(0))(0) as "bucket").show

Выходные данные дают 2 группы для радиуса 0,02:

  +---+------+
  | id|bucket|
  +---+------+
  |  1|2473.0|
  |  2|2473.0|
  |  3|2473.0|
  |  4|2474.0|
  |  5|2474.0|
  |  6|2474.0|
  |  7|2474.0|
  |  8|2473.0|
  |  9|2474.0|
  | 10|2474.0|
  | 11|2473.0|
...