У меня есть поток с 1 строковым атрибутом, который содержит событие Json.
Этот поток получает различные события, к которым я хочу применить выражения пути Json, чтобы я мог использовать эти атрибуты в фильтрах и функциях.
Экстракторы JsonPath работают как фильтры для фильтров и селекторов, к сожалению,Я не могу использовать их для части «Группировать по».На самом деле я делаю это во встроенном приложении Siddhi с добавленным вручную расширением siddhi-execute-json, но для обсуждения, чтобы каждый мог легко проверить и протестировать его, я вставлю пример приложения, которое работает на потоковом процессоре WSO2.Задача выглядит следующим образом:
@App:name("Group_by_json_attribute")
define stream JsonStream(json string);
@sink(type='log')
define stream LogStream(myField string, count long);
@info(name='query1')
from JsonStream#window.time(10 sec)
select json:getString(json, '$.myField') as myField, count() as count
group by myField having count > 1
insert into LogStream;
и может принимать следующие события:
{"myField": "my_value"}
Однако этот запрос вызовет ошибку:
Cannot find attribute type as 'myField' does not exist in 'JsonStream'; define stream JsonStream(json string)
Я также пытался использовать экстрактор Json непосредственно в «Группировать по»:
group by json:getString(json, '$.myField') as myField having count > 1
Однако ошибка теперь:
mismatched input ':' expecting {',', ORDER, LIMIT, OFFSET, HAVING, INSERT, DELETE, UPDATE, RETURN, OUTPUT}
, который, кажется, не ожидает использовать здесь расширение
Мне просто интересно, возможно ли группировать по атрибутам, не определенным непосредственно ввходной поток.В этом случае поле извлекается из объекта JSON, но это может быть любая другая функция, которая генерирует другой атрибут.
Я также использую версии из центрального хранилища maven
- Siddhi : io.siddhi: ядро siddhi: 5.0.1
- siddhi-execute-json : io.siddhi.extension.execution.json: siddhi-execute-json: 2.0.1
(Правка) Разъяснение
Цель состоит в том, чтобы использовать атрибуты, не определенные в Stream, для использования в Group By
.
Причина в том, что в настоящее время у меня есть встроенное приложение, которое определяет весь набор входных потоков, поступающих из внешних источников, отформатированных в формате JSON, а также есть набор выходных потоков для информирования внешних компонентов, когдазапрос совпадений.Это приложение позволяет пользователям создавать пользовательские запросы для этого набора предварительно определенных потоков, но они не могут создавать потоки самостоятельно.
Большое спасибо!