Если вы используете id
для выбора строк, вы не сможете этого сделать.У вас есть 2 варианта:
каждый раз выбирать все строки и отправлять их в ES с помощью запроса SELECT * FROM blog_pro
, что, я думаю, не подходит для вашего сценария.
создайте новый столбец last_modified_time
, который будет содержать последнюю измененную метку времени записи (строки).затем используйте это для фильтрации строк.обратите внимание на свойство tracking_column_type => "timestamp"
statement =>"SELECT * FROM blog_pro WHERE last_modiefied_time >:sql_last_value"
use_column_value =>true
tracking_column =>last_modified_time
tracking_column_type => "timestamp"
здесь приведена полная конфигурация logstash
input {
jdbc {
jdbc_connection_string =>"jdbc:mysql://192.168.3.57:3306/blog_pro"
jdbc_user =>"dush"
jdbc_password =>"dush"
jdbc_driver_library =>"F:\logstash-6.2.2\bin\mysql-connector-java-5.1.6.jar"
jdbc_driver_class =>"com.mysql.jdbc.Driver"
schedule =>"* * * * *"
statement =>"SELECT * FROM blog_pro WHERE last_modified_time >:sql_last_value"
use_column_value =>true
tracking_column =>last_modified_time
tracking_column_type => "timestamp"
}
}
output
{
#output to elasticsearch
elasticsearch {
hosts => [ "192.168.1.245:9201" ]
action=>update
# "%{id}" - > primary key of the table
document_id => "%{id}"
doc_as_upsert =>true
}
}
примечание, вам может потребоваться очистить индекс иначать индексирование с этой конфигурацией.Я проверил это и работает отлично.
Версия Elasticsearch = 5.xx
версия logstash = 6.2.2