Как правильно использовать предложение SQL HAVING со столбцом COUNT? - PullRequest
1 голос
/ 05 апреля 2020

У меня есть следующая таблица.

|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|ujmEBvifdJM6h6RLv...|   0|2013-05-07 04:34:36|    1|Q1sbwvVQXV2734tPg...|  1.0|Total bill for th...|     6|hG7b0MtEbXx5QzbzE...|
|NZnhc2sEQy3RmzKTZ...|   0|2017-01-14 21:30:33|    0|GJXCdrto3ASJOqKeV...|  5.0|I *adore* Travis ...|     0|yXQM5uF2jS6es16SJ...|
|WTqjgwHlXbSFevF32...|   0|2016-11-09 20:09:03|    0|2TzJjDVDEuAW6MR5V...|  5.0|I have to say tha...|     3|n6-Gk65cPZL6Uz8qR...|

Я пытаюсь
- подсчитать текст и идентификатор пользователя для каждого текста (должен дать мне количество вхождений текста) - сгруппировать по тексту
- усреднить коэффициент «круто»
- захватить все результирующие записи, число которых превышает 5
- и отсортировать (упорядочить) элементы по среднему «крутому» коэффициенту

Я получаю сообщение об ошибке. Так как это в pyspark, его немного сложно прочитать. Тем не менее, я чувствую, что мои плохие навыки SQL являются проблемой. Код выполняется без строки HAVING COUNT (text, user_id)> 5 . Вот где я считаю, что проблема в этом. Любые предложения будут великолепны! Спасибо.

query = """    
SELECT 
        text, 
        COUNT(text,user_id) AS unique_count , 
        AVG(cool) AS avg_cool 
    FROM review
    GROUP BY text
    HAVING COUNT(text,user_id) > 5
    ORDER BY AVG(cool) DESC
"""
spark.sql(query).show(50)


желаемый вывод

+--------------------+------------+--------+
|                text|unique_count|avg_cool|
+--------------------+------------+--------+
|In retrospect, I ...|          16|   506.0|
|The Wynn Hotel. O...|          14|   347.0|
|Nutter Butter, Ch...|          12|   290.0|
|It's perky! It's ...|          10|   268.0|

ошибка pyspark

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o24.sql.
: org.apache.spark.sql.AnalysisException: cannot resolve '`cool`' given input columns: [review.text, unique_count, avg_cool]; line 9 pos 17;
'Sort ['AVG('cool) DESC NULLS LAST], true
+- Project [text#190, unique_count#386L, avg_cool#387]
   +- Filter (count(text#190, user_id#192)#392L > cast(5 as bigint))
      +- Aggregate [text#190], [text#190, count(text#190, user_id#192) AS unique_count#386L, avg(cool#185L) AS avg_cool#387, count(text#190, user_id#192) AS count(text#190, user_id#192)#392L]
         +- SubqueryAlias `review`
            +- Relation[business_id#184,cool#185L,date#186,funny#187L,review_id#188,stars#189,text#190,useful#191L,user_id#192] json

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:111)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:108)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$mapChild$2(TreeNode.scala:297)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$13.apply(TreeNode.scala:356)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:356)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:104)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$2.apply(QueryPlan.scala:121)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:121)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:93)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:108)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:86)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:86)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
<ipython-input-13-10aa09933a72> in <module>
      9     ORDER BY AVG(cool) DESC
     10 """
---> 11 spark.sql(query).show(50)

/usr/local/spark/python/pyspark/sql/session.py in sql(self, sqlQuery)
    765         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    766         """
--> 767         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    768 
    769     @since(2.0)

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "cannot resolve '`cool`' given input columns: [review.text, unique_count, avg_cool]; line 9 pos 17;\n'Sort ['AVG('cool) DESC NULLS LAST], true\n+- Project [text#190, unique_count#386L, avg_cool#387]\n   +- Filter (count(text#190, user_id#192)#392L > cast(5 as bigint))\n      +- Aggregate [text#190], [text#190, count(text#190, user_id#192) AS unique_count#386L, avg(cool#185L) AS avg_cool#387, count(text#190, user_id#192) AS count(text#190, user_id#192)#392L]\n         +- SubqueryAlias `review`\n            +- Relation[business_id#184,cool#185L,date#186,funny#187L,review_id#188,stars#189,text#190,useful#191L,user_id#192] json\n"

1 Ответ

1 голос
/ 05 апреля 2020

Я не уверен, что это предложение having или order by. Я знаю, что некоторые базы данных (например, Hive) очень требовательны к выражениям и псевдонимам в них.

Попробуйте использовать псевдонимы столбцов:

SELECT text,  COUNT(text,user_id) AS unique_count, AVG(cool) AS avg_cool 
FROM review
GROUP BY text
HAVING unique_count > 5
ORDER BY avg_cool DESC;
...