Настройка logstash с ssl и отправка сообщения в тему kafka (с ssl) - PullRequest
0 голосов
/ 21 июня 2019

У меня есть logstash-6.5.4 (с ssl), web и планировщик в моем локальном (имя хоста: webbox) и kafka-2.0 (с ssl) на другом (имя хоста: kafkabox).

Я не могу получить сообщение в теме кафки, когда сообщение отправлено из logstash.

Ни одно сообщение об ошибке не отображается, сообщение не отправляется в тему кафки. Я попытался импортировать logstash.crt в склад доверенных прав Кафки, но это также не сработало.

Создано logstash.crt и logstash.key с приведенной ниже командой.

sudo openssl req -x509 -batch -nodes -days 3650 -newkey rsa:2048 -keyout /etc/logstash/logstash.key -out /etc/logstash/logstash.crt

Импортировал logstash.crt в файл хранилища доверенных сертификатов Кафки и попытался.

keytool -import -alias logstash -file logstash.crt -keystore cacerts

Файл конфигурации Logstash приведен ниже ...

input {
    tcp {
    host=>"0.0.0.0" 
    port=>5514 
    type=>"syslogType"
    ssl_enable=>true
    ssl_cert=>"/etc/logstash/logstash.crt"
    ssl_key=>"/etc/logstash/logstash.key"
    ssl_verify=>false
    }
}

filter {
}

output {
    kafka {
    bootstrap_servers=>"kafkabox:9093"
    codec=>"json_lines"
    topic_id=>"a_test"
    ssl_keystore_location=>"keystore file"
    ssl_keystore_password=>"changeit"
    ssl_key_password=>"changeit"
    ssl_truststore_location=>"truststore file"
    ssl_truststore_password=>"changeit"
    security_protocol=>"SSL"
    }
}

Ожидаемое сообщение отправляется из logstash (с SSL) на kafka (с SSL).

Java-код для подключения к logstash, который внутренне не смог отправить сообщение темам kafka (в режиме ssl).

public class LogstashClient {

    private static String message = "<86>Jun 25 14:32:25 webbox sshd[7517]: Failed password for root from 196.165.132.192 port 45691 ssh2";

    public static void main(String[] args) throws Exception {
        nonSSL();
        //SSL();
    }

    private static void SSL() throws Exception {

    // logstash.crt is directly imported into kafka's truststore
    // Below <<Client Truststore>> will also have logstash.crt imported for handshaking while connecting

        System.setProperty("javax.net.ssl.trustStore", "<<Client Truststore>>");
        System.setProperty("javax.net.ssl.trustStorePassword", "test1234");

        SSLSocketFactory factory = (SSLSocketFactory)SSLSocketFactory.getDefault();
        SSLSocket socket = (SSLSocket) factory.createSocket("localhost", 5514);

    System.out.println("Handshaking...");
        socket.startHandshake();
        PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);

        boolean checkError = printWriter.checkError();

        printWriter.println(message);
    }

    private static void nonSSL() throws Exception {
        Socket socket = new Socket("localhost", 5514);

        PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
        printWriter.println(message);
    }
}

Спасибо, RK

...