logstash JDBC Плагин Полиморфный Индекс - PullRequest
0 голосов
/ 11 июля 2019

JDBC Плагин Полиморфный Индекс

Привет, у нас есть таблица, которая полиморфна элементам, и мы хотели бы найти способ обновить разные индексы в одной конфигурации logstash.

Структура таблицы

Ниже приведен пример таблицы. Столбец item_type обозначает тип (например, Pen, Post, Collection), item_id является внешним ключом для элемента в нашей БД, а score вычисляется на кроне и обновляется время от времени, что обновляет наш столбец updated_at.

popularity_scores

Процесс

Используя плагины logstash jdbc, мы хотели бы запросить данные, а затем отправить их в ES. Однако я не вижу способа (кроме конфигурации logstash и SQL-запросов для каждого типа элементов) динамически выдвигать обновления для индексов. В идеальном мире мы хотели бы получить данные из таблицы выше (см. Код ниже)

* * Вход тысяча двадцать-одина * * тысяча двадцать-дв
    input {
        jdbc {
            jdbc_driver_library => "/usr/share/logstash/bin/mysql-connector-java-8.0.15.jar"
            jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
            # useCursorFetch needed cause jdbc_fetch_size not working??
            # https://discuss.elastic.co/t/logstash-jdbc-plugin/84874/2
            # https://stackoverflow.com/a/10772407
            jdbc_connection_string => "jdbc:mysql://${CP_LS_SQL_HOST}:${CP_LS_SQL_PORT}/${CP_LS_SQL_DB}?useCursorFetch=true&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
            statement => "select * from view_elastic_popularity_all where updated_at > :sql_last_value"
            jdbc_user => "${CP_LS_SQL_USER}"
            jdbc_password => "${CP_LS_SQL_PASSWORD}"
            jdbc_fetch_size => "${CP_LS_FETCH_SIZE}"
            last_run_metadata_path => "/usr/share/logstash/cp/last_run_files/last_run_popularity_live"
            jdbc_page_size => '10000'
            use_column_value => true
            tracking_column => 'updated_at'
            tracking_column_type => 'timestamp'
            schedule => "* * * * *"
        }
    }

Затем выполните запросы на обновление ES через плагин вывода (см. Код вывода ниже)

выход

    output {
      elasticsearch {
          index => "HOW_DO_WE_DYNAMICALLY_SET_INDEX_BASED_ON_ITEM_TYPE?"
          document_id => "%{id}"
          hosts => ["${CP_LS_ES_HOST}:${CP_LS_ES_PORT}"]
          user => "${CP_LS_ES_USER}"
          password => "${CP_LS_ES_PASSWORD}"
      }
    }

Помощь

Мы не можем быть первой компанией с этой проблемой. Как бы мы структурировали вывод?

1 Ответ

0 голосов
/ 11 июля 2019

Вы можете динамически установить имя индекса, используя поле в сообщении о событии таким же образом, как вы динамически устанавливаете идентификатор_документа.

output {
  elasticsearch {
      index => "%{item_type}"
      document_id => "%{id}"
      hosts => ["${CP_LS_ES_HOST}:${CP_LS_ES_PORT}"]
      user => "${CP_LS_ES_USER}"
      password => "${CP_LS_ES_PASSWORD}"
  }
}
...