Сжатие и распаковка gzip-строки из Python в Ruby через AMQP (заголовки, метаданные) - PullRequest
0 голосов
/ 30 ноября 2018

У меня есть логи, которые я посылаю из скрипта Python.Я упаковываю их как массив, выкидываю их через json и отправляю в rabbitMQ с помощью библиотеки 'pika'.Затем logstash ловит его из rabbitMQ, анализирует и помещает в эластичный поиск.Теперь я должен сжать их, потому что там большой трафик журналов, и они очень похожи.

Я попытался сжать его, используя следующие способы: Один:

def gzip_str(string_):
    out = io.BytesIO()

    with gzip.GzipFile(fileobj=out, mode='w') as fo:
        fo.write(string_.encode())

    bytes_obj = out.getvalue()
    return bytes_obj

Два:

def gZipString(stringtoZip):
    out = StringIO()
    with gzip.GzipFile(fileobj=out, mode="w") as f:
        f.write(stringtoZip)
    return out.getvalue()

Выдает сообщение, подобное этому:

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='kit_logstash', durable=True)

message_properties = pika.BasicProperties(
    delivery_mode=2, headers={"content-encoding": "gzip"})

channel.basic_publish(exchange='kit_local',
                      routing_key='logstash',
                      body=<RETURN_FROM_GZIP_FUNCTION>,
                      properties=message_properties)
connection.close()

Logstash принимает данные, используя конфигурацию, подобную этой:

input {
  rabbitmq {
        host => "localhost"
    port => 5672
        user => "guest"
        password => "guest"
        metadata_enabled => true
        queue => "kit_logstash"
        durable => true
        id => "rabbitmq_kit_event"
        exchange => "kit_local"
        type => "kitlog-rabbitmq"
    }
}

filter {

  if [type] == "kitlog-rabbitmq" {
    ruby {
      init => "
        require 'zlib'
        require 'stringio'
      "
      code => "
        body = event.get('[http][response][body]').to_s
        sio = StringIO.new(body)
        gz = Zlib::GzipReader.new(sio)
        result = gz.read.to_s
        event.set('[http][response][body]', result)
      "
    }
  }
}

Также пытался использовать входной кодек gzip_lines.

Но главная проблема, насколько я понимаю, в том, что данные в формате gzipped содержат что-то, кроме содержимого (например, метаданных или заголовков), поэтому я не могу быть разархивирован правильно.

Ошибки выглядят следующим образом:

[ERROR][logstash.filters.ruby    ] Ruby exception occurred: not in gzip format
[DEBUG][logstash.pipeline        ] output received {"event"=>{"@timestamp"=>2018-11-30T09:16:19.127Z, "tags"=>["_rubyexception"], "@version"=>"1", "message"=>"x^\\x8B\\xAEV*\\xCE\\xCE\\xCC\\xC9)V\\xB2R\\x88V\\xD26T07\\xB7\\xB0\\xB4\\xB44000W\\x8A\\xD5QPJ\\xCE\\xCF+IL.\\u0001\\xCA*)\\u0001\\xB9\\xA9\\xB9\\x89\\x999 N\\x96C\\x96^r~.X,\\xA5\\u0014(R\\xADT\\x9A\\u000E6#\\xA0\\xB2$#?\\u000F\\xAC\\xB9\\u0000\\\"\\xE2\\u001C\\xAC\\u0014[\\v\\xE4\\xE6%概\\xF4z\\u0001\\xE9b%\\xA0\\xC8\\xC0\\xD9\\u001D\\v\\u0000\\u0003\\x9ADk", "type"=>"kitlog-rabbitmq"}}

Может быть, мне стоит как-то вырезать эти данные?Или использовать поток в любом случае?Есть идеи?

...