У меня есть логи, которые я посылаю из скрипта Python.Я упаковываю их как массив, выкидываю их через json и отправляю в rabbitMQ с помощью библиотеки 'pika'.Затем logstash ловит его из rabbitMQ, анализирует и помещает в эластичный поиск.Теперь я должен сжать их, потому что там большой трафик журналов, и они очень похожи.
Я попытался сжать его, используя следующие способы: Один:
def gzip_str(string_):
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode='w') as fo:
fo.write(string_.encode())
bytes_obj = out.getvalue()
return bytes_obj
Два:
def gZipString(stringtoZip):
out = StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write(stringtoZip)
return out.getvalue()
Выдает сообщение, подобное этому:
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='kit_logstash', durable=True)
message_properties = pika.BasicProperties(
delivery_mode=2, headers={"content-encoding": "gzip"})
channel.basic_publish(exchange='kit_local',
routing_key='logstash',
body=<RETURN_FROM_GZIP_FUNCTION>,
properties=message_properties)
connection.close()
Logstash принимает данные, используя конфигурацию, подобную этой:
input {
rabbitmq {
host => "localhost"
port => 5672
user => "guest"
password => "guest"
metadata_enabled => true
queue => "kit_logstash"
durable => true
id => "rabbitmq_kit_event"
exchange => "kit_local"
type => "kitlog-rabbitmq"
}
}
filter {
if [type] == "kitlog-rabbitmq" {
ruby {
init => "
require 'zlib'
require 'stringio'
"
code => "
body = event.get('[http][response][body]').to_s
sio = StringIO.new(body)
gz = Zlib::GzipReader.new(sio)
result = gz.read.to_s
event.set('[http][response][body]', result)
"
}
}
}
Также пытался использовать входной кодек gzip_lines.
Но главная проблема, насколько я понимаю, в том, что данные в формате gzipped содержат что-то, кроме содержимого (например, метаданных или заголовков), поэтому я не могу быть разархивирован правильно.
Ошибки выглядят следующим образом:
[ERROR][logstash.filters.ruby ] Ruby exception occurred: not in gzip format
[DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2018-11-30T09:16:19.127Z, "tags"=>["_rubyexception"], "@version"=>"1", "message"=>"x^\\x8B\\xAEV*\\xCE\\xCE\\xCC\\xC9)V\\xB2R\\x88V\\xD26T07\\xB7\\xB0\\xB4\\xB44000W\\x8A\\xD5QPJ\\xCE\\xCF+IL.\\u0001\\xCA*)\\u0001\\xB9\\xA9\\xB9\\x89\\x999 N\\x96C\\x96^r~.X,\\xA5\\u0014(R\\xADT\\x9A\\u000E6#\\xA0\\xB2$#?\\u000F\\xAC\\xB9\\u0000\\\"\\xE2\\u001C\\xAC\\u0014[\\v\\xE4\\xE6%概\\xF4z\\u0001\\xE9b%\\xA0\\xC8\\xC0\\xD9\\u001D\\v\\u0000\\u0003\\x9ADk", "type"=>"kitlog-rabbitmq"}}
Может быть, мне стоит как-то вырезать эти данные?Или использовать поток в любом случае?Есть идеи?