Я использую Logstash для переноса данных из mysql вasticsearch. Моя база данных mysql имеет первичную таблицу с именем product, у которой много связей, и запрос, который нужно выбрать, содержит около 46 левых внешних соединений, и в результате возвращается очень большие (50 тыс.) Строк для одной записи. Поэтому я планирую разделить запрос на несколько вариантов выбора. Я использовал jdbc_streaming плагин Logstash. Однако мне интересно, если мое решение логично и правильно?
Это простой конфигурационный файл, описывающий мою реализацию (не для всех отношений):
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/my-conn?useSSL=false&allowPublicKeyRetrieval=true"
jdbc_user => "root"
jdbc_password => "root"
schedule => "* * * * *"
#jdbc_paging_enabled => true
#jdbc_page_size => 10000
#jdbc_fetch_size => 5
statement => "select product_id from product"
clean_run => false
}
}
filter {
jdbc_streaming {
jdbc_driver_library => "mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/my-conn?useSSL=false&allowPublicKeyRetrieval=true"
jdbc_user => "root"
jdbc_password => "root"
statement => "select * from product_translation where product_id = :id"
parameters => { "id" =>"product_id"}
target => "productTranslations"
}
aggregate {
task_id => "%{product_id}"
code => "
require 'C:\elasticsearch\replication\product-replication\demo.rb' ;
ProductMapping.startAggregate(event, map)
"
push_previous_map_as_event => true
timeout => 5
timeout_tags => ["aggregate"]
}
if "aggregate" not in [tags] {
drop{}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
document_id => "%{productId}"
document_type => "product"
index => "test-products"
}
stdout { codec => rubydebug }
}