проблема с logstash grok при фильтрации данных - PullRequest
1 голос
/ 21 мая 2019

У меня есть данные, которые в основном предназначены для удаления данных с помощью команды rm, которые выглядят следующим образом.

ttmv516,19/05/21,03:59,00-mins,dvcm,dvcm 166820 4.1 0.0 4212 736 ? DN 03:59 0:01 rm -rf /dv/project/agile/mce_dev_folic/test/install.asan/install,/dv/svgwwt/commander/workspace4/dvfcronrun_IL-SFV-RHEL6.5-K4_kinite_agile_invoke_dvfcronrun_at_given_site_50322

Я использую ниже logstash grok, который работал нормально, но до недавнего времени я видел два странныхпроблема 1) _grokparsefailure еще 2) Hostname Field не отображается правильно, т.е. его начальные символы отсутствуют, как ttmv516 будет выглядеть как mv516.

%{HOSTNAME:Hostname},%{DATE:Date},%{HOUR:dt_h}:%{MINUTE:dt_m},%{NUMBER:duration}-%{WORD:hm},%{USER:User},%{USER:User_1} %{NUMBER:Pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:Num_1} %{NUMBER:Num_2} %{DATA} (?:%{HOUR:dt_h1}:|)(?:%{MINUTE:dt_m1}|) (?:%{HOUR:dt_h2}:|)(?:%{MINUTE:dt_m2}|)%{GREEDYDATA:CMD},%{GREEDYDATA:PWD_PATH}

Однако, тестирование тоже с Grok Debugger вДанные Kibana отображаются правильно.

enter image description here

Мой файл logstash выглядит следующим образом.

cat /etc/logstash/conf.d/rmlog.conf
input {
  file {
    path => [ "/data/rm_logs/*.txt" ]
    start_position => beginning
    sincedb_path => "/data/registry-1"
    max_open_files => 64000
    type => "rmlog"
  }
}

filter {
  if [type] == "rmlog" {
    grok {
     match => { "message" => "%{HOSTNAME:Hostname},%{DATE:Date},%{HOUR:dt_h}:%{MINUTE:dt_m},%{NUMBER:duration}-%{WORD:hm},%{USER:User},%{USER:User_1} %{NUMBER:Pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:Num_1} %{NUMBER:Num_2} %{DATA} (?:%{HOUR:dt_h1}:|)(?:%{MINUTE:dt_m1}|) (?:%{HOUR:dt_h2}:|)(?:%{MINUTE:dt_m2}|)%{GREEDYDATA:CMD},%{GREEDYDATA:PWD_PATH}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
   }
  }
 }
output {
        if [type] == "rmlog" {
        elasticsearch {
                hosts => ["myhost.xyz.com:9200"]
                manage_template => false
                index => "pt-rmlog-%{+YYYY.MM.dd}"
  }
 }
}

Любое предложение помощи будет высоко оценено.

РЕДАКТИРОВАТЬ:

Сообщения, в которых его сбой в соответствии с моим наблюдением ..

ttmv540,19/05/21,03:59,00-hrs,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv540.373
ttmv541,19/05/21,03:43,-mins,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv541.373

Тем не менее, я попытался отредактировать Грок с условием ниже, но все же он сбрасываетнесколько полей ..

input {
  file {
    path => [ "/data/rm_logs/*.txt" ]
    start_position => beginning
    max_open_files => 64000
    sincedb_path => "/data/registry-1"
    type => "rmlog"
  }
}
filter {
  if [type] == "rmlog" {
    grok {
     match => { "message" => "%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},%{NUMBER:duration}-%{WORD:hm},%{USER:user},%{USER:group} %{NUMBER:pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:num_1} %{NUMBER:num_2} %{DATA} (?:%{HOUR:time_h1}:|)(?:%{MINUTE:time_m1}|) (?:%{HOUR:time_h2}:|)(?:%{MINUTE:time_m2}|)%{GREEDYDATA:cmd},%{GREEDYDATA:pwd}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
   }
  }
  if "_grokparsefailure" in [tags] {
    grok {
      match => { "message" => "%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},-%{WORD:duration},%{USER:user},%{USER:group}%{GREEDYDATA:cmd}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
  }
 }
}
output {
        if [type] == "rmlog" {
        elasticsearch {
                hosts => ["myhost.xyz.com:9200"]
                manage_template => false
                index => "pt-rmlog-%{+YYYY.MM.dd}"
  }
 }
}

Примечание. Похоже, тег _grokparsefailure работает в приведенных ниже сообщениях, но не работает в другом ..

1) это работает ..

 ttmv541,19/05/21,03:43,-mins,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv541.373

ttmv540,19/05/21,03:59,00-hrs,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv540.373

2) Вторая строка текстового журнала завершается неудачно, так как имеет 00-hrs номер связанногос этим, теперь не в состоянии выполнить оба условия с ниже грок ..

%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},-%{WORD:duration},%{USER:user},%{USER:group}%{GREEDYDATA:cmd}

1 Ответ

2 голосов
/ 22 мая 2019

Я бы разделил обработку на две части: одну для решения проблем с именем хоста и отметкой времени, а другую для обработки остальной части строки.Я считаю, что это облегчает обслуживание.

Таким образом, у вас остались эти два входа:

ttmv541,19/05/21,03:43,-mins
ttmv540,19/05/21,03:59,00-hrs

Ваши два паттерна будут хорошо соответствовать первым частям, поэтому вопрос в том, как вы хотите разобрать материал по прошествии времени,В исходном шаблоне вы использовали duration для числовой части и hm для единиц измерения.Во втором паттерне вы, кажется, помещаете единицы в duration, что, вероятно, неправильно.

Без дополнительной информации кажется, что продолжительность необязательна, но у вас всегда будут единицы.Это может быть отражено в вашем паттерне, например:

(%{NUMBER:duration})?-%{WORD:hm}

Также обратите внимание, что если вам когда-нибудь понадобится несколько паттернов, вам не нужно зависеть от того, что их использует grokparsefailure - match-> message можетвзять массив.См. документ для примера.

...