В нескольких словах у меня есть этот стек: FileBeat читает определенный файл журнала и pu sh на Kafka topi c. LogSta sh считывает данные с такого Kafka topi c и вставляет их в ElasticSearch. Подводя итог, скажем, «Журналы файлов -> FileBeat -> Kafka Topi c -> LogSta sh -> ElasticSearch».
My Docker compose:
version: '3.2'
services:
kibana:
image: docker.elastic.co/kibana/kibana:7.5.2
volumes:
- "./kibana.yml:/usr/share/kibana/config/kibana.yml"
restart: always
environment:
- SERVER_NAME=kibana.localhost
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
links:
- elasticsearch
depends_on:
- elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
- xpack.watcher.enabled=false
- discovery.type=single-node
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- "./esdata:/usr/share/elasticsearch/data"
ports:
- "9200:9200"
logstash:
image: docker.elastic.co/logstash/logstash:7.5.2
volumes:
- "./logstash.conf:/config-dir/logstash.conf"
restart: always
command: logstash -f /config-dir/logstash.conf
ports:
- "9600:9600"
- "7777:7777"
links:
- elasticsearch
- kafka1
- kafka2
- kafka3
kafka1:
image: wurstmeister/kafka
command: [start-kafka.sh]
depends_on:
- zoo1
- zoo2
- zoo3
links:
- zoo1
- zoo2
- zoo3
ports:
- "9092:9092"
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LOG_RETENTION_HOURS: "168"
KAFKA_LOG_RETENTION_BYTES: "100000000"
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_CREATE_TOPICS: "log:3:3"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
kafka2:
image: wurstmeister/kafka
depends_on:
- zoo1
- zoo2
- zoo3
links:
- zoo1
- zoo2
- zoo3
ports:
- "9093:9092"
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LOG_RETENTION_HOURS: "168"
KAFKA_LOG_RETENTION_BYTES: "100000000"
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_CREATE_TOPICS: "log:3:3"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
kafka3:
image: wurstmeister/kafka
depends_on:
- zoo1
- zoo2
- zoo3
links:
- zoo1
- zoo2
- zoo3
ports:
- "9094:9092"
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LOG_RETENTION_HOURS: "168"
KAFKA_LOG_RETENTION_BYTES: "100000000"
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_CREATE_TOPICS: "log:3:3"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
zoo1:
image: elevy/zookeeper:latest
environment:
MYID: 1
SERVERS: zoo1,zoo2,zoo3
ports:
- "2181:2181"
zoo2:
image: elevy/zookeeper:latest
environment:
MYID: 2
SERVERS: zoo1,zoo2,zoo3
ports:
- "2182:2181"
zoo3:
image: elevy/zookeeper:latest
environment:
MYID: 3
SERVERS: zoo1,zoo2,zoo3
ports:
- "2183:2181"
filebeat:
image: docker.elastic.co/beats/filebeat:7.5.2
volumes:
- "./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro"
- "./sample-logs:/sample-logs"
links:
- kafka1
- kafka2
- kafka3
depends_on:
- kafka1
- kafka2
- kafka3
filebeat.yml
filebeat.inputs:
- paths:
- /sample-logs/request-sample.log
tags:
- request-sample
input_type: log
document_type: request-sample
fields_under_root: true
output.kafka:
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic: 'log'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
пример файла журнала, который, как ожидается, будет прочитан
2020-01-10 13:33:14,782 INFO {"appName":"xxxx-consultar-transacao-java","component":"br.com.bd.components.logger.RequestFilter","logType":"FUNC","env":"dev","eventTime":"20200110133314717","logSeverity":6,"soc":false,"baseUri":"/xxxx/transactions/v1","resourceURI":"/transactions/retained","resourceAction":"GET","entity":"","statusHttpCode":200,"statusCode":"OK","requestBytes":1150,"responseBytes":0,"responseTime":3754,"params":{"transaction_type":["T"],"status":["A"],"start_date":["20191211"],"end_date":["20200110"],"scheduling":["false"],"offset":["1"],"size":["100"]},"header":{"content-length":"1150","postman-token":"30bccc36-952d-4286-9cc4-2a795193fc5b","host":"bcodmswrk01:8087","connection":"keep-alive","cache-control":"no-cache","accept-encoding":"gzip, deflate","user-agent":"PostmanRuntime/7.21.0","accept":"*/*"},"pathParams":{},"src":"10.100.13.250","solicitationID":"e4af7622-a38a-4502-96c0-c8decf9dae64","headerXForwardedFor":"10.100.13.250"}
logsta sh .conf
input {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
client_id => "logstash"
group_id => "logstash"
consumer_threads => 3
topics => ["log"]
codec => "json"
tags => ["log", "kafka_source"]
type => "log"
}
}
filter {
if [type] == "request-sample" {
grok {
match => { "message" => "%{COMMONAPACHELOG}" }
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
remove_field => ["timestamp"]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-%{[type]}-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
Вход в контейнер Flexiblesearch и поиск для некоторого индекса, полученного из моих журналов, я его вообще не вижу
sh-4.2# curl -XGET 'localhost:9200/_cat/indices'
yellow open twitter _YO2OfkZTyml62a0q_2Vag 1 1 0 0 283b 283b
green open .kibana_task_manager_1 ZPM_sJH8Se6hpqcBNaArxw 1 0 2 1 16.2kb 16.2kb
green open .apm-agent-configuration ZwUEUvCIQzuEeFkXiGAMFg 1 0 0 0 283b 283b
green open .kibana_1 dMtrvO2OSW6OwUsF06B4sg 1 0 7 0 25.6kb 25.6kb
sh-4.2#
Просматривая каждый журнал из каждого контейнера, я не могу найти ни исключения, ни какую-либо информацию, которая могла бы дать мне какой-то совет, какая часть стек не работает.
Может, кто-нибудь подскажет, если мне не хватает какой-то дополнительной конфигурации FileBeat для запуска процесса или какой-либо другой конфигурации, которую я могу проверить?
*** отредактировано
журналы от Кафки на контейнере
[2020-02-06 01:47:32,621] WARN [SocketServer brokerId=1] Unexpected error from /172.26.0.1; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1212498244 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at kafka.network.Processor.poll(SocketServer.scala:890)
at kafka.network.Processor.run(SocketServer.scala:789)
*** отредактировано
