Как связать эластичный поиск с Postgres с помощью logstash? - PullRequest
0 голосов
/ 09 октября 2019

У меня проблемы с передачей данных из postgres в эластичные с помощью logstash. Я получаю сообщение об ошибке "block in converge_state". Сгенерированная ошибка:

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of \r, \n at line 36, column 4 (byte 583) after # }", :backtrace=>["/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2577:in `map'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/java_pipeline.rb:24:in `initialize'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "/usr/local/Cellar/logstash/7.3.2/libexec/logstash-core/lib/logstash/agent.rb:325:in `block in converge_state'"]}

Мой файл конфигурации выглядит следующим образом:

input {
    jdbc {

        jdbc_connection_string => "jdbc:postgresql://localhost:5432/school"


        jdbc_user => "postgres"
        jdbc_password => "postgres"


        jdbc_driver_library => "/Users/karangupta/Downloads/postgresql-42.2.8.jar"


        jdbc_driver_class => "org.postgresql.Driver"

        statement => "select sch.udise_sch_code,
    sch.school_name,
    e.edu_block_name,
    d.district_name,
    s.state_name,

    sch.school_address,
    sch.pincode,
    case when sch.sch_loc_r_u = 1 then 'rural'
        when sch.sch_loc_r_u = 2 then 'urban' end as "sch_location_type",
    sch.assembly_const,
    sch.status,

    schd.academic_year,
    schd.sch_category,
    schd.class_frm,
    schd.class_to,

    case when schd.sch_type = 1 then 'boys' 
          when schd.sch_type = 2 then 'girls' 
          else 'coed' end as "school_type",

    case when sde.is_cce_ele = 1 then 'yes'
        when sde.is_cce_ele = 2 then 'no' 
        else 'NA' end as "cce_implemented_ele",
    case when sde.is_pcr_maintained = 1 then 'yes'
        when sde.is_pcr_maintained = 2 then 'no' 
        else 'NA' end as "cce_implemented_ele",
    case when sde.is_pcr_shared_parents = 1 then 'yes'
        when sde.is_pcr_shared_parents = 2 then 'no' 
        else 'NA' end as "is_pcr_shared_parents",
    case when sde.is_cce_sec = 1 then 'yes'
        when sde.is_cce_sec = 2 then 'no' 
        else 'NA' end as "cce_implemented_sec",
    case when sde.is_cce_hrsec = 1 then 'yes'
        when sde.is_cce_hrsec = 2 then 'no' 
        else 'NA' end as "cce_implemented_hrsec",
    sde.rte_25p_applied,
    sde.rte_25p_enrolled_previous_yr,
    sde.is_smc,
    sde.smc_tot_m,
    sde.smc_tot_f,
    sde.sms_parents_m,
    sde.sms_parents_f,
    sde.smc_lgb_m
from mst_school_2017_18 sch 
inner join mst_edu_block_2017_18 e on sch.udise_edu_block_code = e.udise_edu_block_code
inner join mst_district_2017_18 d on e.udise_dist_code = d.udise_district_code
inner join mst_state_2017_18 s on d.udise_state_code = s.udise_state_code
inner join school_details_2017_18 schd on sch.udise_sch_code = schd.udise_sch_code
inner join school_details_extended_2017_18 sde on sch.udise_sch_code = sde.udise_sch_code"
    }
}


output {
    stdout { codec => json_lines }
}

# output {
#   elasticsearch {
#       index => "schoolDetails"
#       hosts => "http://localhost:9200"
#   }
# }

Я не знаю, что я делаю здесь неправильно.

Ответы [ 2 ]

1 голос
/ 09 октября 2019

Я предлагаю вам сохранить ваш многострочный оператор SQL внутри файла, а затем сослаться на этот файл в вашей конфигурации .

Поэтому вместо

statement => "select ..."

Использоватьэто:

statement_filepath => "/absolute/path/to/statement.sql" 
0 голосов
/ 09 октября 2019

Плагин ввода JDBC ожидает оператора SQL без разрывов строки . Поэтому, заменив все разрывы строк пробелами, все будет в порядке.

Я сделал это для вас через Notepad ++. Настройка оператора должна выглядеть следующим образом:

statement => "select sch.udise_sch_code, sch.school_name, e.edu_block_name, d.district_name, s.state_name, sch.school_address, sch.pincode, case when sch.sch_loc_r_u = 1 then 'rural' when sch.sch_loc_r_u = 2 then 'urban' end as "sch_location_type", sch.assembly_const, sch.status, schd.academic_year, schd.sch_category, schd.class_frm, schd.class_to, case when schd.sch_type = 1 then 'boys' when schd.sch_type = 2 then 'girls' else 'coed' end as "school_type", case when sde.is_cce_ele = 1 then 'yes' when sde.is_cce_ele = 2 then 'no' else 'NA' end as "cce_implemented_ele", case when sde.is_pcr_maintained = 1 then 'yes' when sde.is_pcr_maintained = 2 then 'no' else 'NA' end as "cce_implemented_ele", case when sde.is_pcr_shared_parents = 1 then 'yes' when sde.is_pcr_shared_parents = 2 then 'no' else 'NA' end as "is_pcr_shared_parents", case when sde.is_cce_sec = 1 then 'yes' when sde.is_cce_sec = 2 then 'no' else 'NA' end as "cce_implemented_sec", case when sde.is_cce_hrsec = 1 then 'yes' when sde.is_cce_hrsec = 2 then 'no' else 'NA' end as "cce_implemented_hrsec", sde.rte_25p_applied, sde.rte_25p_enrolled_previous_yr, sde.is_smc, sde.smc_tot_m, sde.smc_tot_f, sde.sms_parents_m, sde.sms_parents_f, sde.smc_lgb_m from mst_school_2017_18 sch inner join mst_edu_block_2017_18 e on sch.udise_edu_block_code = e.udise_edu_block_code inner join mst_district_2017_18 d on e.udise_dist_code = d.udise_district_code inner join mst_state_2017_18 s on d.udise_state_code = s.udise_state_code inner join school_details_2017_18 schd on sch.udise_sch_code = schd.udise_sch_code inner join school_details_extended_2017_18 sde on sch.udise_sch_code = sde.udise_sch_code"

РЕДАКТИРОВАТЬ:

Как указано @Val, альтернативой является загрузка оператора SQL из файла.

...