Logsta sh Code c - Реестр схемы Avro: сбой avro_schema_registry из-за неопределенной локальной переменной или метода esponse - PullRequest
0 голосов
/ 11 февраля 2020

У меня есть Logsta sh conf, который читает из Kafka topi c в json формате, он использует avro_schema_registry для сериализации вывода в avro. Вот файл conf:

input {
  kafka{
    group_id => "test_group"
    topics => ["logs_json"]
    bootstrap_servers => "server2:9094, server1:9094, server3:9094"
    codec => "json"
    consumer_threads => 1
  }
}

output {
  kafka {
    codec => avro_schema_registry {
      endpoint => "http://host_schema_registry:port"
      schema_id  => 1
    }
    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
    bootstrap_servers => "server1:9094, server1:9094, server1:9094"
    topic_id => "logs_avro"
  }
}

Но я получаю эту ошибку:

org.jruby.exceptions.NameError: (NameError) undefined local variable or method `esponse' for #<SchemaRegistry::Client:0x3c5ad39>
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.request(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/cli
ent.rb:127) ~[?:?]
        at uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:914) ~[?:?]
        at uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:609) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.request(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/cli
ent.rb:101) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.schema(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/clie
nt.rb:40) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_1_dot_1.lib.logstash.codecs.avro_schema_registry.get_schema(/usr/share/logstash/vendor/bundle/jruby/2.5.0/g
ems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:158) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_1_dot_1.lib.logstash.codecs.avro_schema_registry.encode(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/
logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:246) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_0_dot_0_minus_java.lib.logstash.outputs.kafka.multi_receive(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logs
tash-integration-kafka-10.0.0-java/lib/logstash/outputs/kafka.rb:219) ~[?:?]
        at org.jruby.RubyArray.each(org/jruby/RubyArray.java:1800) ~[jruby-complete-9.2.8.0.jar:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_0_dot_0_minus_java.lib.logstash.outputs.kafka.multi_receive(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logs
tash-integration-kafka-10.0.0-java/lib/logstash/outputs/kafka.rb:217) ~[?:?]
        at org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.multi_receive(org/logstash/config/ir/compiler/OutputStrategyExt.java:118) ~[logstash-core.jar:?]
        at org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:101) ~[logstash-core.jar:?]
        at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:262) ~[?:?]
[2020-02-11T13:11:41,720][ERROR][org.logstash.execution.WorkerLoop][main] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.

Ответы [ 2 ]

1 голос
/ 17 февраля 2020

После дополнительной отладки кода я понял, что Внутренняя ошибка сервера была вызвана строкой в ​​файле client.rb для установки «Принять» в заголовке запроса GET.

/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/client.rb:112
        request['Accept'] = "application/vnd.schemaregistry.v1+json"

путем комментирования или изменения значение для запроса ['Accept'] = "application / json", запрос GET прошел успешно.

0 голосов
/ 11 февраля 2020

Этот код c не работает

См. Открытый выпуск - https://github.com/wvanbergen/schema_registry/issues/5


Нет причин для сериализации JSON в Avro, затем вставьте в Elasticsearch в любом случае, так как Elasticsearch хранит JSON, но если вы действительно захотите это сделать, я бы предложил вместо этого использовать Confluent Elasticsearch Kafka Connector.

Если вы даже не хотите используя Elasticsearch, я не думаю, что Logsta sh действительно нужно использовать.

K SQL поддерживает то, что вы пытаетесь сделать - https://www.confluent.io/stream-processing-cookbook/ksql-recipes/changing-data-serialization-format-json-avro/

...