почему сглаживать и собирать список ошибок в Scala?с не может разрешить символ - PullRequest
0 голосов
/ 21 февраля 2019

У меня есть набор данных, который я пытаюсь сгладить с помощью scala.

+---------+-----------+--------+
|visitorId|trackingIds|emailIds|
+---------+-----------+--------+
|     a   |       666b|      12|
|     7   |       c0b5|      45|
|     7   |       c0b4|      87|
|     a   |  666b,7p88|        |
+---------+-----------+--------+

Я пытаюсь получить фрейм данных, сгруппированный по идентификатору посетителя.Ниже приведен формат

+---------+---------------------+--------+
|visitorId|   trackingIds       |emailIds|
+---------+---------------------+--------+
|     a   |       666b,666b,7p88|   12,87|
|     7   |       c0b4,c0b5     |      45|       
+---------+---------------------+--------+

Мой код:

object flatten_data{


  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local[5]")
      .appName("Flatten_DF")
      .enableHiveSupport()
      .getOrCreate()

       val df =  spark.read.format("csv")
      .option("header","true")
      .option("delimiter",",")
      .load("/home/cloudera/Desktop/data.txt")
    print(df.show())
    val flattened = df.groupBy("visitorID").agg(collect_list("trackingIds"))
  }
}

Я использую IntelliJ Idea и получаю сообщение об ошибке в "collect_list".Я прочитал много решений по stackoverflow, где люди спрашивали о том, как сгладить и groupbykey и использовали один и тот же collect_list.Я не уверен, почему это не работает для меня.Это из-за IntelliJ?

Ответы [ 2 ]

0 голосов
/ 21 февраля 2019

Я переработал ваш код, и похоже, что он работает:

import org.apache.spark.sql.SparkSession
   import org.apache.spark.sql.functions._

   object flatten_data{

   def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local").appName("test").getOrCreate()
    import spark.implicits._
    val someDF = Seq(
      ("a", "666b",12),
      ("7", "c0b5",45),
      ("7", "666b,7p88",10)
    ).toDF("visitorId","trackingIds","emailIds")


    someDF.groupBy("visitorID").agg(collect_list("trackingIds")).show()
        }
    }
0 голосов
/ 21 февраля 2019

collect_list - это метод, определенный в объекте org.apache.spark.sql.functions, поэтому вам необходимо импортировать его:

import org.apache.spark.sql.functions.collect_list

В качестве альтернативы, вы можете импортировать весь объект, тогда вы сможете использоватьоттуда и другие функции:

import org.apache.spark.sql.functions._

Наконец, лично я предпочитаю подход * импортировать functions как f и использовать квалифицированные вызовы:

import org.apache.spark.sql.{functions => f}

agg(f.collect_list(...))

Таким образомглобальное пространство имен внутри файла не загрязняется целым рядом функций, определенных в functions.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...