У меня есть Logsta sh conf, который читает из Kafka topi c в json формате, он использует avro_schema_registry для сериализации вывода в avro. Вот файл conf:
input {
kafka{
group_id => "test_group"
topics => ["logs_json"]
bootstrap_servers => "server2:9094, server1:9094, server3:9094"
codec => "json"
consumer_threads => 1
}
}
output {
kafka {
codec => avro_schema_registry {
endpoint => "http://host_schema_registry:port"
schema_id => 1
}
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
bootstrap_servers => "server1:9094, server1:9094, server1:9094"
topic_id => "logs_avro"
}
}
Но я получаю эту ошибку:
org.jruby.exceptions.NameError: (NameError) undefined local variable or method `esponse' for #<SchemaRegistry::Client:0x3c5ad39>
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.request(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/cli
ent.rb:127) ~[?:?]
at uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:914) ~[?:?]
at uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:609) ~[?:?]
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.request(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/cli
ent.rb:101) ~[?:?]
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.schema(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/clie
nt.rb:40) ~[?:?]
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_1_dot_1.lib.logstash.codecs.avro_schema_registry.get_schema(/usr/share/logstash/vendor/bundle/jruby/2.5.0/g
ems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:158) ~[?:?]
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_1_dot_1.lib.logstash.codecs.avro_schema_registry.encode(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/
logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:246) ~[?:?]
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_0_dot_0_minus_java.lib.logstash.outputs.kafka.multi_receive(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logs
tash-integration-kafka-10.0.0-java/lib/logstash/outputs/kafka.rb:219) ~[?:?]
at org.jruby.RubyArray.each(org/jruby/RubyArray.java:1800) ~[jruby-complete-9.2.8.0.jar:?]
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_0_dot_0_minus_java.lib.logstash.outputs.kafka.multi_receive(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logs
tash-integration-kafka-10.0.0-java/lib/logstash/outputs/kafka.rb:217) ~[?:?]
at org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.multi_receive(org/logstash/config/ir/compiler/OutputStrategyExt.java:118) ~[logstash-core.jar:?]
at org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:101) ~[logstash-core.jar:?]
at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:262) ~[?:?]
[2020-02-11T13:11:41,720][ERROR][org.logstash.execution.WorkerLoop][main] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.