Свести вложенные кортежи в RDD - PullRequest
0 голосов
/ 30 мая 2018

Я использую Spark SQL для извлечения строк из таблицы.Некоторые из этих данных повторяются, и я пытаюсь подсчитать количество вхождений.По сути, я пытаюсь выполнить базовый пример «подсчета слов», но вместо того, чтобы мои данные имели вид: (Word : String, Count : Int), у нас есть строка данных, заменяющая слово / строку.

Подробнеев частности, мои данные выглядят так: RDD[((row), count)], где строка извлекается из таблицы sql и содержит строки, двойные числа, целые и т. д.

Это в форме RDD, потому что я хочу использовать reduceByKey.См .: Избегайте groupByKey .Это пара (Key, Value) с очень длинным ключом (некоторая строка из базы данных sql) и его значением является «количество слов».

Мое приложение делает это:

myDataframe
    // Append a 1 to each row
    .map(row => (row, 1))
    // Convert to RDD so we can use the reduceByKey method
    .rdd
    // Add up the 1's corresponding to matching keys
    .reduceByKey(_ + _)
    //Filter by rows that show up more than 10 times
    .filter(_._2 > 100)

    ...

Теперь предположим, что мои данные строки содержат (string, double, int).Здесь я хочу распаковать свои данные из RDD[((string, double, int), count)] в RDD[(string, double, int, count)], чтобы я мог в конечном итоге сохранить эти данные в другую таблицу SQL.

Есть ли какой-то метод, который позволяет мне распаковать содержимое этого.... вложенный кортеж ... что-то вроде этого?

Мое решение состояло в том, чтобы "распаковать" элементы RDD следующим образом: .map(row => (row._1._1, row._1._2, row._1._3, row._2))

Но должен быть лучший способ!Если я решу извлечь больше элементов из строки, мне придется изменить этот .map() вызов.

Спасибо!

Ответы [ 2 ]

0 голосов
/ 30 мая 2018

Вам НЕ нужно возвращаться к использованию RDD;Статья, на которую вы ссылаетесь правильно, предупреждает об использовании RDD.groupByKey, но ее не следует применять к DataFrame groupBy.Безопасно (и эффективно) использовать groupBy в DataFrame!Подробнее здесь .

Таким образом, для группировки по всем столбцам DataFrame, подсчета вхождений каждой группы и фильтрации для групп с количеством> 10 вы можете просто использовать:

df.groupBy(df.columns.map(col): _*) // alternatively: df.groupBy(df.columns.head, df.columns.tail: _*)
  .count()
  .filter($"count" > 10)

Результат имеет схему, аналогичную входной, с дополнительным столбцом count long.

0 голосов
/ 30 мая 2018

Вы можете использовать Row toSeq и fromSeq, как в следующем примере:

val df = Seq(
  ("a", 10.0, 1),
  ("a", 10.0, 1),
  ("b", 20.0, 2),
  ("c", 30.0, 3),
  ("c", 30.0, 3)
).toDF("c1", "c2", "c3")

import org.apache.spark.sql.Row

df.rdd.
  map((_, 1)).
  reduceByKey(_ + _).
  filter(_._2 > 1).
  map{
    case (row: Row, count: Int) => Row.fromSeq(row.toSeq :+ count)
  }
// res1: Array[org.apache.spark.sql.Row] = Array([a,10.0,1,2], [c,30.0,3,2])
...