Не на 100% понятно, чего вы пытаетесь достичь. См. Комментарий выше к вашему вопросу о добавлении дополнительных сведений, чтобы помочь людям понять, чего вы пытаетесь достичь.
Тем не менее, я могу сказать ....
Функция Min
- это не распознается по двум причинам:
- Вы передаете вывод
TIMESTAMPTOSTRING
в MIN
, но MIN
не принимает строку. - Вы можете ' t используйте агрегатную функцию в предложении
WHERE
.
Сообщение об ошибке, которое вы видите, похоже на ошибку. Если он все еще существует в последней версии ksqlDB, вы можете поднять проблему в проекте ksqlDB GitHub .
Даже исправление этих двух вещей, которые вы запрашиваете, все равно не удастся, так как окно в ksqlDB требует агрегирования, поэтому вам понадобится GROUP BY
.
Если, например, вы хотите захватить минимум metric_datetime_utc
на metric_value
для каждого 5-минутного окна, вы можете сделать это с помощью :
CREATE TABLE start_metric_value AS
SELECT
metric_value,
MIN(metric_datetime_utc) as minTs
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY metric_value;
Будет создана оконная таблица, то есть таблица, в которой ключ состоит из metric_value
и WINDOWSTART
времени. minTs
сохранит минимальную дату и время.
Давайте пропустим некоторые данные через запрос, чтобы понять, что происходит:
Ввод:
rowtime | metric_value | metric_datetime_utc
--------|---------------|--------------------
1 | A | 3
2 | A | 4
3 | A | 2
4 | B | 5
300000 | A | 6
Вывод в START_METRIC_VALUE
topi c может быть (Примечание: metric_Value и windowStart будут храниться в ключе записи Kafka, а minTs будут в значении):
metric_value | windowStart | minTs
-------------|-------------|------
A | 0 | 3
A | 0 | 3
A | 0 | 2
B | 0 | 5
A | 300000 | 6
Что фактически выводится в topi c будет зависеть от вашего значения cache.max.bytes.buffering
. При установке значения 0
при отключении буферизации будет отображаться вышеуказанный вывод. Однако при включенной буферизации некоторые промежуточные результаты могут не выводиться в Kafka, хотя конечный результат для каждого окна останется прежним. Вы также можете контролировать то, что выводится в Kafka, используя предстоящую функцию SUPPRESS
Вышеупомянутое решение дает вам минимальную временную метку для каждого metric_value. Если вам нужна глобальная минимальная дата и время для каждого окна, вы можете GROUP BY
константу. Обратите внимание, что это направляет все события на один узел ksqlDB, поэтому он плохо масштабируется в качестве решения. Если масштабирование является проблемой, существуют решения, например, например, сначала вычислить минимум metric_value
, а затем выполнить его постобработку, чтобы найти глобальный минимум.
CREATE TABLE start_metric_value AS
SELECT
1 as Key,
MIN(metric_datetime_utc) as minTs
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY 1;
Примечание: синтаксис верен для версии 0.10 ksqlDB. Возможно, вам потребуется выполнить корректировку для других версий.