Logstash 6.2.4 с модулем netflow
Elasticsearch Версия: 6.2.4
Ubuntu 16.04 LTS
У меня проблема, когда logstash прослушивает правильный порт, но, похоже, не собирает
данные о сетевом потоке и их передача вasticsearch.
Маршрутизаторы в нашей сети отправляют свои данные сетевого потока на сервер A, а nfcap прослушивает порт
9995, поэтому попытка запустить logstash с модулем netflow на сервере A приводит к использованию адреса
ошибка. Итак, я использую iptables для дублирования пакетов и пересылки их на другой сервер, сервер B, вот так.
iptables -t mangle -A PREROUTING -p udp --dport 9995 -j TEE --gateway <Server B ip address>
При проверке с помощью tcpdump я вижу, что дублированные пакеты принимаются сервером B с IP-адресом сервера A.
Вывод ниже, но по соображениям безопасности я отредактировал IP-адреса.
tcpdump -i eno1 -n dst port 9995
12:49:49.130772 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.131067 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133504 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133527 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133533 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1260
12:49:49.391871 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1452
12:49:49.391894 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1368
Итак, я знаю, что Сервер B получает пакеты через порт 9995.
Проверка с помощью netstat также показывает это.
netstat -an | grep 9995
udp 0 0 0.0.0.0:9995 0.0.0.0:*
logstash.yml выглядит следующим образом
node.name: server-b
path.data: /var/lib/logstash
http.host: "0.0.0.0"
modules:
- name: netflow
var.input.udp.port: 9995 # Inbound connections
var.elasticsearch.hosts: "<ip address>:9200"
var.kibana.host: "<ip address>:5601"
path.logs: /var/log/logstash
Проверка /var/log/logstash/logstash-plain.log, единственное предупреждение, которое я вижу, о том, что версияasticsearch больше версии 6, поэтому тип не будет использоваться для
определить тип документа.
[2018-07-06T12:58:13,771][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2018-07-06T12:58:13,817][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-06T12:58:17,599][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"module-netflow", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-07-06T12:58:17,733][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://<ip address>:9200/]}}
[2018-07-06T12:58:17,734][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://<ip address>:9200/, :path=>"/"}
[2018-07-06T12:58:17,784][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://<ip address>:9200/"}
[2018-07-06T12:58:17,810][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-07-06T12:58:17,810][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2018-07-06T12:58:17,811][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//<ip address>:9200"]}
[2018-07-06T12:58:18,088][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,101][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,102][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,120][INFO ][logstash.inputs.udp ] Starting UDP listener {:address=>"0.0.0.0:9995"}
[2018-07-06T12:58:18,126][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"module-netflow", :thread=>"#<Thread:0x16700849@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:247 sleep>"}
[2018-07-06T12:58:18,131][INFO ][logstash.inputs.udp ] UDP listener started {:address=>"0.0.0.0:9995", :receive_buffer_bytes=>"212992", :queue_size=>"2000"}
[2018-07-06T12:58:18,135][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["module-netflow"]}
ElasticSearch работает и получает данные из пакета Beat и File Beat, в /var/log/elasticsearch/elasticsearch.log нет ничего, что могло бы подсказать любую ошибку сластиком поиска.
Тем не менее ,asticsearch не имеет шаблон индекса для netflow. Кибана, с другой стороны, делает.
Итак, logstash на сервере B прослушивает 0.0.0.0:9995, порт 9995 открыт и получает пакеты от сервера A, но logstash не распознает эти пакеты.
Я предполагаю, что Сервер B игнорирует их, потому что IP-адрес назначения совпадает с адресом Сервера A. Это звучит правильно? Если так, есть ли вокруг этого?
Есть ли лучший способ переслать дублированные пакеты с сервера A на сервер B и прочитать их в logstash?
К сожалению, добавить другое назначение экспортера сетевого потока в конфигурации маршрутизатора невозможно.