Коррелированная скалярная переменная должна быть агрегирована для скалярного подзапроса в искре - PullRequest
1 голос
/ 02 апреля 2019

У меня есть Dataset<Row>, который содержит шесть столбцов, как показано ниже:

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188264901  |  0002019000000|        0       | 0     |Voltage       |    5   |
 |1554188264901  |  0002019000000|        0       | 0     |SetPoint      |    7   |
 |1554188276412  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188276412  |  0002019000000|        0       | 0     |SetPoint      |    10  |  
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

Конечная цель:

Получение последней обновленной строки на основе MAX(time) для комбинации thingId, controller, module и variableName.

, поэтому требуемый вывод должен иметь MAX(time) во всех строках и last_updatedValue для остальных значений variableName.

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

и столбец variableName имеет два значения ('Voltage' и 'SetPoint') для этого конкретного thingId, контроллера и модуля, поэтому для значения Voltage в столбце variableName должно возвращаться последняя обновленная строка для значения Voltage с MAX(time).

как показано ниже, Ожидаемый результат:

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188276412  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

Что я пробовал:

Я пытался Scalar sub-query получить это, но столбец внутри подзапроса должен был быть агрегирован, я пытался разными способами, но безуспешно.

Например, этот код ниже:

 Dataset<Row> inputds = spark.read().format("avro").load("hdfs://path");
 inputds.createOrReplaceTempView("abc");
 Dataset<Row> update = spark.sql("select MAX(p.time) max_time, p.thingId, p.controller, p.module, p.variableName, (SELECT d.value from abc d where d.thingId=p.thingId and d.controller=p.controller and d.module=p.module and d.variableName=p.variableName group by thingId,controller,module,variableName,value) as values from abc p")
 update.show();

который выдает ошибку:

Коррелированная скалярная переменная должна быть агрегирована для скалярного подзапроса

Как я могу решить это? Пожалуйста, предложите мне, если есть какие-либо обходные пути.

Спасибо!

Ответы [ 2 ]

1 голос
/ 03 апреля 2019

Я решил это, наконец, используя struct в наборе искровых данных.

Входной набор данных

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188264901  |  0002019000000|        0       | 0     |Voltage       |    5   |
 |1554188264901  |  0002019000000|        0       | 0     |SetPoint      |    7   |
 |1554188276412  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188276412  |  0002019000000|        0       | 0     |SetPoint      |    10  |  
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

 Dataset<Row> intermediate = inputDS.groupby("thingId","controller","module","variableName").agg(max(struct("time","value")).as("time_value_struct")).select("thingId","controller","module","variableName","time_value_struct.*");

 //above code gives me intermediate output
 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188276412  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

Так что теперь моя задача - взять максимальное значение из столбца time и заполнить его для этого thingId, контроллера и модуля для используемого SQL, как показано ниже

intermediate.createOrReplaceTempView("intermediate");

Dataset<Row> outputDS = spark.sql("select B.time,A.thingId,A.controller,A.module,A.variableName,A.value from intermediate A 
inner join (select thingId,controller,module,MAX(time)time from intermediate group by thingId,controller,module) B 
on A.thingId=B.thingId and A.controller=B.controller and A.module=B.module");

Что дает нам Ожидаемый результат

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188639406  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

Так что теперь я могу поворачиваться, чтобы получить последнее Обновленное значение для каждого thingId, контроллера и модуля

Я знал, что sql для промежуточного шага имеет внутреннее соединение, если бы я мог найти какой-то эффективный запрос sql вместо внутреннего соединения, который был бы великолепен.

Спасибо @johwhite за помощь

1 голос
/ 02 апреля 2019

Проблема, по-видимому, в том, что вам на самом деле нужны агрегация и сортировка .

Вам необходимо иметь значение, непосредственно связанное с MAX(time), для этого конкретного сгруппированного значения столбца variableName, поэтому в основном это значение, которое находится в той же строке.Поскольку в SQL нет функции агрегирования, вы можете отсортировать результаты подзапроса.

Таким образом, для достижения желаемого значения «последнее обновление» строка, вы сортируете подзапрос по time по убыванию, а затем ограничиваете результаты до 1 строки.

Это может быть что-то вроде этого:

Dataset<Row> update = spark.sql("SELECT
    MAX(p.time) max_time,
    p.thingId, p.controller, p.module, p.variableName,
    (SELECT d.value FROM abc d WHERE d.thingId=p.thingId AND d.controller=p.controller AND d.module=p.module AND d.variableName=p.variableName
        ORDER BY time DESC LIMIT 1) AS [lastUpdatedValue]
FROM abc p
GROUP BY thingId,controller,module,variableName")

PS Я пришел из SQL Server Background, поэтому для этого я обычно делал бы TOP 1.Я не совсем уверен, что LIMIT 1 будет иметь те же эффекты в Apache Spark SQL.

РЕДАКТИРОВАТЬ: Я нашел это , спасибо этот ответ здесь .

В основном речь идет о функции агрегации в spark, называемой first.

Может быть, использование ее в подзапросе решит проблему?

    (SELECT first(d.value) FROM abc d WHERE d.thingId=p.thingId AND d.controller=p.controller AND d.module=p.module AND d.variableName=p.variableName
        ORDER BY time DESC LIMIT 1) AS [lastUpdatedValue]
...