почему logstash прекращает обработку журналов - PullRequest
0 голосов
/ 22 апреля 2019

Logstash прекращает обработку журналов через несколько часов. Когда обработка журналов прекращается, служба logstash потребляет высокую производительность ЦП (всего около 25 ядер из 32). Когда служба logstash работает нормально, она потребляет всего около 4-5 ядер. Трубопровод генерирует около 50 тыс. Событий в минуту. Logstash Conf (не по умолчанию): pipe.workers: 15 pipe.batch.size: 100 JVM CONF: -Xms15g -Xmx15g

input {
  tcp {
    port => 5044
    type => syslog
  }
  udp {
    port => 5044
    type => syslog
  }
}


filter {
  if [type] == "syslog" {
    grok {
      match => [ "message", "%{SYSLOG5424PRI}%{NOTSPACE:syslog_timestamp} %{NOTSPACE:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" ]
    }
    kv {
      id => "logs_kv"
      source => "syslog_message"
      trim_key => " "
      trim_value => " "
      value_split => "="
      field_split => " "
    }

  mutate {
     remove_field  => [ "syslog_message", "syslog_timestamp" ]
    }

    #now check if source IP is a private IP, if so, tag it   
    cidr {
      address => [ "%{srcip}" ]
      add_tag => [ "src_internalIP" ]
      network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ]
    }

    # don't run geoip if it's internalIP, otherwise find the GEOIP location
    if "src_internalIP" not in [tags] {
      geoip {
        add_tag => [ "src_geoip" ]
        source => "srcip"
    database => "/usr/share/elasticsearch/modules/ingest-geoip/GeoLite2-City.mmdb"
      }
      geoip {
        source => "srcip"
    database => "/usr/share/elasticsearch/modules/ingest-geoip/GeoLite2-ASN.mmdb"
      }

    } 
    else {
      #check DST IP now.  If it is a private IP, tag it 
      cidr {
        add_tag => [ "dst_internalIP" ]
        address => [ "%{dstip}" ]
        network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ]
      }

      # don't run geoip if it's internalIP, otherwise find the GEOIP location
      if "dst_internalIP" not in [tags] {
        geoip {
          add_tag => [ "dst_geoip" ]
          source => "dstip"
    database => "/usr/share/elasticsearch/modules/ingest-geoip/GeoLite2-City.mmdb"
        }
        geoip {
          source => "dstip"
    database => "/usr/share/elasticsearch/modules/ingest-geoip/GeoLite2-ASN.mmdb"
        }
      }
    }
  }
}

output {
if [type] == "syslog" {

  elasticsearch {hosts => ["127.0.0.1:9200" ]
        index         => "sysl-%{syslog_hostname}-%{+YYYY.MM.dd}"
    }
#stdout { codec => rubydebug }
}
}

Когда logstash прекращает обработку, я не вижу никаких ошибок в файле журнала (уровень журнала - трассировка). Видят только эти сообщения:

[2019-04-19T00:00:12,004][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2019-04-19T00:00:17,011][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2019-04-19T00:00:17,012][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2019-04-19T00:00:22,015][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2019-04-19T00:00:22,015][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2019-04-19T00:00:27,023][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2019-04-19T00:00:27,024][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2019-04-19T00:00:32,030][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2019-04-19T00:00:32,030][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}

формат событий:

[2019-04-22T13:04:27,190][DEBUG][logstash.pipeline        ] filter received {"event"=>{"type"=>"syslog", "@version"=>"1", "@timestamp"=>2019-04-22T10:04:27.159Z, "port"=>50892, "message"=>"<30>2019:04:22-13:05:08 msk ulogd[18998]: id=\"2002\" severity=\"info\" sys=\"SecureNet\" sub=\"packetfilter\" name=\"Packet accepted\" action=\"accept\" fwrule=\"6\" initf=\"eth2\" outitf=\"eth1\" srcmac=\"70:79:b3:ab:e0:e8\" dstmac=\"00:1a:8c:f0:89:02\" srcip=\"10.0.134.138\" dstip=\"10.0.131.134\" proto=\"17\" length=\"66\" tos=\"0x00\" prec=\"0x00\" ttl=\"126\" srcport=\"63936\" dstport=\"53\" ", "host"=>"10.0.130.235"}}

Помогите, пожалуйста, отладить эту проблему.

Ответы [ 2 ]

0 голосов
/ 23 апреля 2019

решаемая.Проблема была в фильтре kv, который останавливал logstash при попытке проанализировать неструктурированные данные, проходящие через конвейер.

0 голосов
/ 22 апреля 2019

По данным интернета, сборщик мусора ParNew - это "останови мир".Если для возобновления требуется 5 секунд, а вы получаете GC каждые 5 секунд, вы не получите пропускную способность от logstash, поскольку он всегда блокируется.

...