Конвертировать HiveQL в Spark Scala - PullRequest
0 голосов
/ 17 января 2019

Я хочу преобразовать запрос HiveQL с оконной функцией в запрос Scala Spark ... но я постоянно получаю одно и то же исключение.

Контекст проблемы: mytable состоит из category и product полей. Я хочу получить список с топовым частым продуктом для каждой категории. DF ниже HiveContext объект

Оригинальный запрос (работает правильно):

SELECT category, product, freq FROM (
    SELECT category, product, COUNT(*) AS freq, 
    ROW_NUMBER() OVER (PARTITION BY category ORDER BY COUNT(*) DESC) as seqnum
    FROM mytable GROUP BY category, product) ci 
WHERE seqnum <= 10;

Что у меня сейчас (частично преобразовано, не работает):

val w = row_number().over(Window.partitionBy("category").orderBy(count("*").desc))
val result = df.select("category", "product").groupBy("category", "product").agg(count("*").as("freq"))
val new_res = result.withColumn("seqNum", w).where(col("seqNum") <= 10).drop("seqNum")

Постоянно получает следующее исключение:

Исключение в потоке "main" org.apache.spark.sql.AnalysisException: выражение 'category' не присутствует в группе и не является агрегатной функцией. Добавьте в group by или добавьте first () (или first_value), если вам все равно, какое значение вы получите.;

Что здесь может быть не так?

1 Ответ

0 голосов
/ 17 января 2019

Ваша ошибка - использовать агрегат в предложении orderBy:

.orderBy(count("*").desc)

Если написано так, выражение вводит новое агрегированное выражение. Вместо этого вы должны ссылаться на существующий агрегат по имени:

.orderBy("freq")

Итак, ваш код должен выглядеть так:

val w = row_number().over(
  Window.partitionBy("category").orderBy("freq"))
val result = df.select("category", "product")
  .groupBy("category", "product")
  .agg(count("*").as("freq"))
val new_res = result
  .withColumn("seqNum", w).where(col("seqNum") <= 10)
  .drop("seqNum")
...