Перевод спарк-агрегации данных в SQL-запрос;проблемы с окном, сгруппировкой, и как агрегировать? - PullRequest
0 голосов
/ 28 мая 2018

Я бездельничаю с данными из Spark: Полное руководство и использую Java только для целей полной округлости.

Я хорошо читаю данные из CSV и создаю временныйпросмотреть таблицу следующим образом:

Dataset<Row> staticDataFrame = spark.read().format("csv").option("header","true").option("inferSchema","true").load("/data/retail-data/by-day/*.csv");

staticDataFrame.createOrReplaceTempView("SalesInfo");

spark.sql("SELECT CustomerID, (UnitPrice * Quantity) AS total_cost, InvoiceDate from SalesInfo").show(10);

Это работает нормально и возвращает следующие данные:

+----------+------------------+--------------------+
|CustomerID|        total_cost|         InvoiceDate|
+----------+------------------+--------------------+
|   14075.0|             85.92|2011-12-05 08:38:...|
|   14075.0|              25.0|2011-12-05 08:38:...|
|   14075.0|39.599999999999994|2011-12-05 08:38:...|
|   14075.0|              30.0|2011-12-05 08:38:...|
|   14075.0|15.299999999999999|2011-12-05 08:38:...|
|   14075.0|              40.8|2011-12-05 08:38:...|
|   14075.0|              39.6|2011-12-05 08:38:...|
|   14075.0|             40.56|2011-12-05 08:38:...|
|   18180.0|              17.0|2011-12-05 08:39:...|
|   18180.0|              17.0|2011-12-05 08:39:...|
+----------+------------------+--------------------+
only showing top 10 rows

У меня возникают проблемы при попытке сгруппировать его по CustomerID, однако при попыткесгруппировать его по CustomerID,

spark.sql("SELECT CustomerID, (UnitPrice * Quantity) AS total_cost, InvoiceDate from SalesInfo GROUP BY CustomerID").show(10);

Я получаю:

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'salesinfo.`UnitPrice`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.

Я понимаю, что я делаю неправильно, а именно, что он не знает, как агрегировать total_costи дата выставления счета, но я застрял на том, как сделать это с точки зрения SQL;книга использует функции агрегации искры и делает это:

selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")

.groupBy (col ("CustomerId"), window (col ("InvoiceDate"), "1 day")) .sum ("total_cost")

Но я пытаюсь понять, как я могу это сделать с помощью операторов SQL в качестве учебного упражнения.

Любая помощь в том, как сделать это с помощью запроса, приветствуется.

Я пытаюсь выяснить, как это сделать, когда я просто получаю общие итоги по каждому идентификатору клиента, а затем как получить полную функциональность изложения искры книги, где это общая сумма с разбивкой по часам по идентификатору клиента.

Спасибо

РЕДАКТИРОВАТЬ: Это источник данных;Я просто читаю все это в одном наборе данных

https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data/by-day

Ответы [ 2 ]

0 голосов
/ 28 мая 2018

Чтобы завершить ответ Джеффа Хакера, вы можете использовать метод explain(true) в своем объекте DataFrame, чтобы увидеть план выполнения:

== Physical Plan ==
*(2) HashAggregate(keys=[CustomerId#16, window#41], functions=    [sum(total_cost#26)], output=[CustomerId#16, window#41, sum(total_cost)#35])
  +- Exchange hashpartitioning(CustomerId#16, window#41, 200)
    +- *(1) HashAggregate(keys=[CustomerId#16, window#41], functions=[partial_sum(total_cost#26)], output=[CustomerId#16, window#41, sum#43])
      +- *(1) Project [named_struct(start,     precisetimestampconversion(((((CASE WHEN     (cast(CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType,     LongType) - 0) as double) / 8.64E10)) as double) =     (cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) THEN (CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) + 1) ELSE CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) END + 0) - 1) * 86400000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) as double) = (cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) THEN (CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) + 1) ELSE CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) END + 0) - 1) * 86400000000) + 86400000000), LongType, TimestampType)) AS window#41, CustomerId#16, (UnitPrice#15 * cast(Quantity#13 as double)) AS total_cost#26]
     +- *(1) Filter isnotnull(InvoiceDate#14)
        +- *(1) FileScan csv [Quantity#13,InvoiceDate#14,UnitPrice#15,CustomerID#16] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/spark/retail/2010-12-01.csv, file:/tmp/spar..., PartitionFilters: [], PushedFilters: [IsNotNull(InvoiceDate)], ReadSchema: struct<Quantity:int,InvoiceDate:timestamp,UnitPrice:double,CustomerID:double>

Как видите, Spark создает ключ агрегации из CustomerId и окна (00: 00: 00 - 23:59:59 для каждого дня) [HashAggregate(keys=[CustomerId#16, window#41]] и перемещает все строки с такими ключами в один раздел (Exchange hashpartitioning).Этот факт перемещения данных между разделами известен как случайная операция.Позже он выполняет функцию SUM (...) для таких накопленных данных.

При этом выражение GROUP BY с 1 ключом должно генерировать только 1 строку для этого ключа.Таким образом, если в исходном запросе вы определили CustomerID в качестве ключа и total_cost с InvoiceDate в проекции, механизм не сможет получить 1 строку для CustomerID, поскольку 1 CustomerID может иметь несколько InvoiceDate.Нет никаких исключений в отношении языка SQL.

0 голосов
/ 28 мая 2018

Итак, я интерпретирую то, что вы говорите, как UnitPrice * Количество на клиента, в час, в SQL:

select 
    customerid, 
    sum(unitprice * quantity) as total_cost, 
    cast(cast(InvoiceDate as date) as varchar) + ' ' + cast(DATEPART(HH, InvoiceDate) as varchar) + ':00'
from [retail-data] 
group by CustomerID, cast(cast(InvoiceDate as date) as varchar) + ' ' + cast(DATEPART(HH, InvoiceDate) as varchar) + ':00'
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...