Spark [Scala]: неоднозначное имя столбца groupBy - PullRequest
0 голосов
/ 11 марта 2019

Итак, во время тестирования я получаю это сообщение об ошибке:

org.apache.spark.sql.AnalysisException: Reference 'from' is ambiguous, could be: from, from.;

во время тестирования, но не при запуске детали в spark-shell ...?

Яделая перекрестное соединение на следующих фреймах данных:

scala> timeSpanDF.show
+----------+----------+
|      from|        to|
+----------+----------+
|2018-01-01|2018-02-01|
|2018-01-01|2018-03-01|
|2018-02-01|2018-03-01|
+----------+----------+


scala> df.show
+------------+----------+--------+-----+--------------------+
|pressroom_id|     month|category|event|               email|
+------------+----------+--------+-----+--------------------+
|           1|2017-01-01| contact| open|somebody@example.com|
|           1|2018-01-01| contact| open|     me1@example.com|
|           1|2018-02-01| contact| open|     me1@example.com|
|           1|2018-02-01| contact| open|     me1@example.com|
|           1|2018-01-01| contact| open|     you@example.com|
|           1|2018-03-01| contact| open|     etc@example.com|
|           1|2018-02-01| contact| open|     me2@example.com|
|           1|2018-02-01| contact| open|     me2@example.com|
|           2|2018-01-01| contact| open|     me1@example.com|
+------------+----------+--------+-----+--------------------+

, поэтому я делаю это

val joinedDF = timeSpansDF
    .crossJoin(df)
    .filter(
        df("month") >= timeSpansDF("from") 
        && df("month") < timeSpansDF("to")
    )
    .distinct

и получаю это

scala> joinedDF.show
+----------+----------+------------+----------+--------+-----+---------------+
|      from|        to|pressroom_id|     month|category|event|          email|
+----------+----------+------------+----------+--------+-----+---------------+
|2018-01-01|2018-03-01|           2|2018-01-01| contact| open|me1@example.com|
|2018-02-01|2018-03-01|           1|2018-02-01| contact| open|me1@example.com|
|2018-02-01|2018-03-01|           1|2018-02-01| contact| open|me2@example.com|
|2018-01-01|2018-03-01|           1|2018-01-01| contact| open|me1@example.com|
|2018-01-01|2018-02-01|           1|2018-01-01| contact| open|me1@example.com|
|2018-01-01|2018-03-01|           1|2018-02-01| contact| open|me2@example.com|
|2018-01-01|2018-02-01|           2|2018-01-01| contact| open|me1@example.com|
|2018-01-01|2018-03-01|           1|2018-01-01| contact| open|you@example.com|
|2018-01-01|2018-03-01|           1|2018-02-01| contact| open|me1@example.com|
|2018-01-01|2018-02-01|           1|2018-01-01| contact| open|you@example.com|
+----------+----------+------------+----------+--------+-----+---------------+

Затем позже я хочу агрегировать этотаблица, вот так, и вот где я получаю странное сообщение:

joinedDF.where(col("category") === lit(category) && col("event") === lit("open"))
    .groupBy("pressroom_id", "from", "to")
    .agg(count("email").cast("integer").as("something"))

, которое указывает на groupBy.Странная часть в том, что это работает в оболочке, но когда эти операции помещаются в функции и тестируются с помощью scalaTest, они выдают ошибку?

Whats up doc?

Ответы [ 2 ]

0 голосов
/ 11 марта 2019

Я не эксперт по Scala, но я администратор базы данных.

Я подозреваю, что ваша проблема возникла из-за использования зарезервированного слова SQL from в качестве имени столбца, поскольку трассировка стека показывает, что Исключение происходит из модуля Spark SQL: org.apache.spark.sql.AnalysisException.

Или:

  1. Попробуйте изменить имя столбца на то, что не является зарезервированным словом; или

  2. Полностью квалифицировать имя столбца как joinedDF.from.

NB. Ваш второй фрагмент кода относится к фрейму данных с именем timeSpanDF, а ваш третий - к timeSpansDF (множественное число).

Редактировать: Как новый член сообщества, у меня недостаточно репутации, чтобы оставлять комментарии к ответу @ KZapagol, но я считаю, что суть его ответа заключается в том, что в оригинальном постере есть опечатка joinedDF.where пункт: col("category") === lit(category) => col("category") === lit("contact").

0 голосов
/ 11 марта 2019

Поскольку у меня нет кода для генерации joinDF, и, следовательно, я сам подготовил Dataframe для генерации joinDF. Я протестировал его в ScalaTest, и он работает нормально.

Пожалуйста, обновите ваш код, как показано ниже.

val df = Seq(("2018-01-01", "2018-03-01", 2,"contact","open","me1@example.com"),
                ("2018-02-01","2018-03-01",1, "contact","open","me1@example.com"),
                ("2018-01-01","2018-03-01",1, "contact","open","you@example.com"),
                ("2018-02-01","2018-03-01",1, "contact","open","me1@example.com"),
                ("2018-01-01","2018-02-01",1, "contact","open","me1@example.com"),
                ("2018-01-01","2018-02-01", 1, "contact","open","you@example.com")).
      toDF("from", "to", "pressroom_id","category","event","email")

df.show() 

+----------+----------+------------+--------+-----+---------------+
|      from|        to|pressroom_id|category|event|          email|
+----------+----------+------------+--------+-----+---------------+
|2018-01-01|2018-03-01|           2| contact| open|me1@example.com|
|2018-02-01|2018-03-01|           1| contact| open|me1@example.com|
|2018-01-01|2018-03-01|           1| contact| open|you@example.com|
|2018-02-01|2018-03-01|           1| contact| open|me1@example.com|
|2018-01-01|2018-02-01|           1| contact| open|me1@example.com|
|2018-01-01|2018-02-01|           1| contact| open|you@example.com|
+----------+----------+------------+--------+-----+---------------+

val df1 = df.where(col("category") === lit("contact") && col("event") === lit("open"))
      .groupBy("pressroom_id", "from", "to")
      .agg(count("email").cast("integer").as("something"))

df1.show()



   +------------+----------+----------+---------+
|pressroom_id|      from|        to|something|
+------------+----------+----------+---------+
|           2|2018-01-01|2018-03-01|        1|
|           1|2018-01-01|2018-03-01|        1|
|           1|2018-02-01|2018-03-01|        2|
|           1|2018-01-01|2018-02-01|        2|
+------------+----------+----------+---------+

Я добавил оператор импорта в свой код.

import org.apache.spark.sql.functions._

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...