Преобразовать строки в несколько строк в Spark Scala - PullRequest
0 голосов
/ 15 февраля 2020

У меня проблема с преобразованием одной строки в несколько строк. Это основано на другом отображении, которое у меня есть. Я попытался привести пример ниже.

Предположим, у меня есть файл паркета со схемой ниже

ColA, ColB, ColC, Size, User

Мне нужно объединить вышеуказанные данные в несколько строк на основе карты поиска. Предположим, у меня есть stati c map

ColA, ColB, Sum(Size)
ColB, ColC, Distinct (User)
ColA, ColC, Sum(Size)

Это означает, что одну строку во входном RDD необходимо преобразовать в 3 агрегатные. Я считаю, что RDD - это способ go с FlatMapPair, но я не уверен, как go об этом.

Я также в порядке, чтобы объединить столбцы в один ключ, что-то вроде ColA_ColB et c.

Для создания нескольких агрегатов из одних и тех же данных я начал с чего-то вроде этого


val keyData: PairFunction[Row, String, Long] = new PairFunction[Row, String, Long]() {
    override def call(x: Row) = {
      (x.getString(1),x.getLong(5))
    }
  }

val ip15M = spark.read.parquet("a.parquet").toJavaRDD

val pairs = ip15M.mapToPair(keyData)

java.util.List[(String, Long)] = [(ios,22), (ios,23), (ios,10), (ios,37), (ios,26), (web,52), (web,1)]

Я считаю, что мне нужно сделать flatmaptopair вместо mapToPair. Аналогичным образом я пытался

  val FlatMapData: PairFlatMapFunction[Row, String, Long] = new PairFlatMapFunction[Row, String, Long]() {
    override def call(x: Row) = {
      (x.getString(1),x.getLong(5))
    }
  }

, но он выдает ошибку

Expression of type (String, Long) doesn't conform to expected type util.Iterator[(String, Long)]

Любая помощь приветствуется. Пожалуйста, дайте мне знать, если мне нужно добавить больше деталей.

1 Ответ

0 голосов
/ 15 февраля 2020

результат должен иметь только 3 столбца? Я имею в виду col1, col2, col3 (общий результат). Второй агрегат - это четкое количество пользователей? (Я предполагаю, что да).

Если это так, вы можете в основном создать 3 фрейма данных, а затем объединить их. Что-то на пути:

val df1 = spark. sql ("выберите colA в качестве col1, colB в качестве col2, сумму (размер) в качестве группы colAgg по colA, colB")

val df2 = spark. sql ("выберите colB в качестве col1, col C в качестве col2, Distinct (User) в качестве группы colAgg по colB, col C")

val df3 = spark. sql ("выберите colA как col1, col C как col2, sum (Size) как colAgg group по colA, col C")

df1.union (df2) .union (df3)

...