Как построить хронологический список событий в Splunk - PullRequest
0 голосов
/ 14 апреля 2019

У меня есть индекс, который перечисляет (среди прочего) устройство, дату события и уровень (1-4). Устройства меняют уровни через случайные интервалы. Мне нужно построить поиск, который показывает, как долго определенное устройство находится на определенном уровне, но я не могу сделать простой подсчет; если устройство находится на уровне 1 в течение трех дней, переходит на уровень 2 на пять дней, а затем обратно на уровень 1 на два дня, счетчик покажет пять дней, что, очевидно, неверно. Как создать поле «Последовательные дни на текущем уровне»?

Мне нужен запрос, который сообщает об устройстве, дате, уровне и днях на текущем уровне. Спасибо!

1 Ответ

0 голосов
/ 18 апреля 2019

Я подозреваю, транзакция - это то, что вы ищете. Это группирует события с полями, которые соответствуют (и / или другим критериям), и сообщает о количестве событий и продолжительности.

К сожалению, неясно, как выглядят ваши данные .... Сначала я подумал, что это может быть приближением ваших данных ... устройства проверяют в случайное время, давая отчет о том, на каком уровне они оказались в то время:

| makeresults count=100 
| eval decrementsecs=random()%10, decrementday=random()%2,state=random()%4+1,device=random()%10+1,device=case(device=1,"A",device=2,"B",device=3,"C",device=4,"D",device=5,"E",device=6,"F",device=7,"G",device=8,"H",device=9,"I",device=10,"J") 
| streamstats sum(decrementday) as days sum(decrementsecs) as secs 
| eval _time=_time-secs-(24*60*60*days) 
| fields - days secs decrement* 

В этом мире выборки вы ищете количество времени, прошедшее с того момента, как устройство сообщило о состоянии, отличном от самого последнего сообщенного состояния ... что-то вроде:

<base search>
| streamstats current=f global=f window=2 last(state) as last by device 
| where isnull(last) OR last!=state 
| dedup 2 device 
| transaction device
| eval days_at_current_level=if(eventcount=2,round(duration/(24*60*60)),"unknown")

Если, однако, эти выборки не являются выборками и фактически являются самими переходами, то:

<base search>
| stats max(_time) as _time latest(state) as level by device
| eval days_at_current_level=round((now()-_time)/(24*60*60))

Но если судить по тому, как вы описывали вещи, может показаться, что у вас есть X устройств, каждое из которых ежедневно сообщает о своем состоянии ... в этом случае ваши данные на самом деле, вероятно, выглядят так:

| makeresults count=10
| streamstats count
| eval _time=_time-(24*60*60*(count-1)), device=split("A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P",",")
| mvexpand device
| eval level=random()%4+1

И в этом случае, поскольку я не беспокоюсь об изменениях в течение нескольких дней, мой поиск может выглядеть так просто:

<base search>
| transaction device level

(Предостережение с транзакцией, помните о необходимых данных и ограничивайте данные, поступающие в транзакцию, для повышения производительности ... если вам нужны только _time, устройство и уровень, используйте поля, чтобы избавиться от всего остального до транзакции .... например ... | fields - _raw | fields _time device level Ограничивая данные до транзакции, вы ограничиваете данные, возвращаемые индексаторами, и ограничивает объем данных, которые команда транзакции должна хранить вместе / отслеживать и таким образом, вы получите лучшую производительность)

...