Как я могу динамически обновлять свои данные из MySQL с помощью logstash (без дубликатов)? - PullRequest
0 голосов
/ 29 января 2019

Я настроил logstash.conf для динамической вставки данных моей базы данных, но проблема заключается в следующем:

, когда я изменяю строку в моей таблице, эта строка не обновляется в моем индексе, потому что я только вставляю новые значенияпосле sql_last_value я хоть и про нас триггер но не знаю как мне это сделать.

input {
  jdbc { 
    jdbc_connection_string =>"jdbc:mysql://localhost:3306/blog"
    jdbc_user =>"root"
    jdbc_password =>""
    jdbc_driver_library =>"C:\Users\saidb\Downloads\mysql-connector-java-5.1.47\mysql-connector-java-5.1.47.jar"
    jdbc_driver_class =>"com.mysql.jdbc.Driver"
    schedule =>"* * * * *"
    statement =>"SELECT * FROM blog_pro WHERE id >:sql_last_value"
    use_column_value =>true
    tracking_column =>id
    }
  }
output {
  elasticsearch {
    hosts =>"localhost:9200"
    index =>"blog_pro"
    document_type =>"data"
  }
}

1 Ответ

0 голосов
/ 30 января 2019

Если вы используете id для выбора строк, вы не сможете этого сделать.У вас есть 2 варианта:

  1. каждый раз выбирать все строки и отправлять их в ES с помощью запроса SELECT * FROM blog_pro, что, я думаю, не подходит для вашего сценария.

  2. создайте новый столбец last_modified_time, который будет содержать последнюю измененную метку времени записи (строки).затем используйте это для фильтрации строк.обратите внимание на свойство tracking_column_type => "timestamp"

statement =>"SELECT * FROM blog_pro WHERE last_modiefied_time >:sql_last_value" use_column_value =>true tracking_column =>last_modified_time tracking_column_type => "timestamp"

здесь приведена полная конфигурация logstash

input { 

 jdbc { 
    jdbc_connection_string =>"jdbc:mysql://192.168.3.57:3306/blog_pro"
    jdbc_user =>"dush"
    jdbc_password =>"dush"
    jdbc_driver_library =>"F:\logstash-6.2.2\bin\mysql-connector-java-5.1.6.jar"
    jdbc_driver_class =>"com.mysql.jdbc.Driver"
    schedule =>"* * * * *"
    statement =>"SELECT * FROM blog_pro WHERE last_modified_time  >:sql_last_value"
    use_column_value =>true
    tracking_column =>last_modified_time
    tracking_column_type => "timestamp"
    } 
 }

output 
{ 
    #output to elasticsearch    
    elasticsearch {
        hosts => [ "192.168.1.245:9201" ]
        action=>update
        # "%{id}" - > primary key of the table 
        document_id => "%{id}"
        doc_as_upsert =>true
    }

}

примечание, вам может потребоваться очистить индекс иначать индексирование с этой конфигурацией.Я проверил это и работает отлично.

Версия Elasticsearch = 5.xx

версия logstash = 6.2.2

...