Синхронизация базы данных SQL сервера и индекса Elasticsearch - PullRequest
0 голосов
/ 25 февраля 2020

Я пытаюсь синхронизировать c данные с SQL Сервера. Я попытался использовать следующую ссылку.

https://www.elastic.co/de/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash

Проблема здесь в том, что операторы внутри конфигурации Logsta sh предназначены для MySQL. Я пытался преобразовать оператор в SQL операторы сервера. После того, как я его запустил, файлы не индексируются. Мое утверждение в файле конфигурации Logsta sh выглядит следующим образом:

statement => "SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > :sql_last_value AND modification_time < getutcdate()) ORDER BY modification_time ASC"

И следующий вывод:

[2020-02-25T11:55:50,092][INFO ][logstash.inputs.jdbc     ][main] (0.007739s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]
[2020-02-25T11:55:55,202][INFO ][logstash.inputs.jdbc     ][main] (0.001840s) SELECT CAST(SERVERPROPERTY('ProductVersion') AS varchar)
[2020-02-25T11:55:55,208][INFO ][logstash.inputs.jdbc     ][main] (0.001305s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]
[2020-02-25T11:56:00,338][INFO ][logstash.inputs.jdbc     ][main] (0.002202s) SELECT CAST(SERVERPROPERTY('ProductVersion') AS varchar)
[2020-02-25T11:56:00,349][INFO ][logstash.inputs.jdbc     ][main] (0.002047s) SELECT TOP (1) count(*) AS [COUNT] FROM (SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > 0 AND modification_time < getutcdate()) ORDER BY modification_time ASC) AS [T1]

Итак, оператор выполняется, но ничего не индексируется.

Моя конфигурация Logsta sh выглядит следующим образом:

input {
  jdbc {
    jdbc_driver_library => "<driver>"
    jdbc_driver_class => "<class>"
    jdbc_connection_string => "<connection>"
    jdbc_user => <user>
    jdbc_password => <pw>
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT TOP 100 PERCENT *, DATEDIFF(s, '1970-01-01 00:00:00', modification_time) AS unix_ts_in_secs FROM es_table WHERE (DATEDIFF(s, '1970-01-01 00:00:00', modification_time) > :sql_last_value AND modification_time < getutcdate()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
  stdout { codec =>  "rubydebug"}
  elasticsearch {
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

1 Ответ

1 голос
/ 25 февраля 2020

Может быть, вам не хватает " last_run_metadata_path "

Файл трекера используется для захвата "состояния".

Пример из официальной документации:

input {
  jdbc {
    statement => "SELECT * FROM mgd.seq_sequence WHERE _sequence_key > ? AND _sequence_key < ? + ? ORDER BY _sequence_key ASC"
    prepared_statement_bind_values => [":sql_last_value", ":sql_last_value", 4]
    prepared_statement_name => "foobar"
    use_prepared_statements => true
    use_column_value => true
    tracking_column_type => "numeric"
    tracking_column => "_sequence_key"
    last_run_metadata_path => "/elastic/tmp/testing/confs/test-jdbc-int-sql_last_value.yml"
    # ... other configuration bits
  }
}
...