Коррелированный подзапрос сгруппированного выражения - TreeNodeException: Binding атрибут, tree: count (1) # 382L - PullRequest
0 голосов
/ 27 ноября 2018

Допустим, я пытаюсь сделать некоторую статистику по некоторым выборочным данным, которые состоят из пар (значения a и b).Некоторые пары существуют несколько раз, другие нет.

spark.createDataFrame([
    Row(a=5, b=10), Row(a=5, b=10), Row(a=5, b=10),
    Row(a=6, b=10), Row(a=6, b=10), Row(a=6, b=10), Row(a=6, b=10), Row(a=6, b=10), Row(a=6, b=10),
    Row(a=5, b=11), Row(a=5, b=11),
    Row(a=6, b=12), Row(a=6, b=12), Row(a=6, b=12), Row(a=6, b=12),
    Row(a=5, b=5), Row(a=5, b=5), Row(a=5, b=5), Row(a=5, b=5), Row(a=5, b=5), Row(a=5, b=5), Row(a=5, b=5),
]).registerTempTable('mydata')

Во-первых, я просто подсчитываю, как часто существует каждая пара:

spark.sql('''
SELECT a, b,
    COUNT(*) as count
FROM mydata AS o
GROUP BY a, b
''').show()

Вывод:

+---+---+-----+
|  a|  b|count|
+---+---+-----+
|  6| 12|    4|
|  5|  5|    7|
|  6| 10|    6|
|  5| 10|    3|
|  5| 11|    2|
+---+---+-----+

Теперь я хочу добавить дополнительный столбец, содержащий процент того, как часто существует пара, по сравнению с общим числом пар с одинаковым значением для a.Для этого я попытался добавить коррелированный подзапрос, вычисляющий общее количество:

spark.sql('''
SELECT a, b,
    COUNT(*) as count,
    (COUNT(*) / (
        SELECT COUNT(*) FROM mydata AS i WHERE o.a = i.a
    )) as percentage
FROM mydata AS o
GROUP BY a, b
''').show()

Что я ожидаю:

+---+---+-----+----------+
|  a|  b|count|percentage|
+---+---+-----+----------+
|  6| 12|    4|       0.4|  --> 10 pairs exist with a=6 --> 4/10 = 0.4
|  5|  5|    7|    0.5833|  --> 12 pairs exist with a=5 --> 7/12  =0.5833
|  6| 10|    6|       0.6|  --> ...
|  5| 10|    3|      0.25|
|  5| 11|    2|    0.1666|
+---+---+-----+----------+

Что я получаю:

py4j.protocol.Py4JJavaError: An error occurred while calling o371.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: count(1)#382L
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
[...]
Caused by: java.lang.RuntimeException: Couldn't find count(1)#382L in [a#305L,b#306L,count(1)#379L]
    at scala.sys.package$.error(package.scala:27)
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:97)
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:91)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 80 more

Это звучит немного странно - каким-то образом pyspark хочет получить доступ к счетчику внутреннего соединения?

Что-то не так с моим синтаксисом подзапроса?

1 Ответ

0 голосов
/ 27 ноября 2018

Из первой таблицы вы можете рассчитать процент с помощью оконной функции;sum(count) over (partition by a) будет суммировать count на a, при этом длина результата не уменьшается, что позволяет вам делить на другой столбец напрямую:

spark.sql('''
    SELECT a, b,
        COUNT(*) as count
    FROM mydata AS o
    GROUP BY a, b
''').registerTempTable('count')

spark.sql('''
    SELECT *, 
           count / sum(count) over (partition by a) as percentage 
    FROM count
''').show()
+---+---+-----+-------------------+
|  a|  b|count|         percentage|
+---+---+-----+-------------------+
|  6| 12|    4|                0.4|
|  6| 10|    6|                0.6|
|  5|  5|    7| 0.5833333333333334|
|  5| 10|    3|               0.25|
|  5| 11|    2|0.16666666666666666|
+---+---+-----+-------------------+
...