У меня есть logstash-6.5.4 (с ssl), web и планировщик в моем локальном (имя хоста: webbox) и kafka-2.0 (с ssl) на другом (имя хоста: kafkabox).
Я не могу получить сообщение в теме кафки, когда сообщение отправлено из logstash.
Ни одно сообщение об ошибке не отображается, сообщение не отправляется в тему кафки. Я попытался импортировать logstash.crt
в склад доверенных прав Кафки, но это также не сработало.
Создано logstash.crt
и logstash.key
с приведенной ниже командой.
sudo openssl req -x509 -batch -nodes -days 3650 -newkey rsa:2048 -keyout /etc/logstash/logstash.key -out /etc/logstash/logstash.crt
Импортировал logstash.crt в файл хранилища доверенных сертификатов Кафки и попытался.
keytool -import -alias logstash -file logstash.crt -keystore cacerts
Файл конфигурации Logstash приведен ниже ...
input {
tcp {
host=>"0.0.0.0"
port=>5514
type=>"syslogType"
ssl_enable=>true
ssl_cert=>"/etc/logstash/logstash.crt"
ssl_key=>"/etc/logstash/logstash.key"
ssl_verify=>false
}
}
filter {
}
output {
kafka {
bootstrap_servers=>"kafkabox:9093"
codec=>"json_lines"
topic_id=>"a_test"
ssl_keystore_location=>"keystore file"
ssl_keystore_password=>"changeit"
ssl_key_password=>"changeit"
ssl_truststore_location=>"truststore file"
ssl_truststore_password=>"changeit"
security_protocol=>"SSL"
}
}
Ожидаемое сообщение отправляется из logstash (с SSL) на kafka (с SSL).
Java-код для подключения к logstash, который внутренне не смог отправить сообщение темам kafka (в режиме ssl).
public class LogstashClient {
private static String message = "<86>Jun 25 14:32:25 webbox sshd[7517]: Failed password for root from 196.165.132.192 port 45691 ssh2";
public static void main(String[] args) throws Exception {
nonSSL();
//SSL();
}
private static void SSL() throws Exception {
// logstash.crt is directly imported into kafka's truststore
// Below <<Client Truststore>> will also have logstash.crt imported for handshaking while connecting
System.setProperty("javax.net.ssl.trustStore", "<<Client Truststore>>");
System.setProperty("javax.net.ssl.trustStorePassword", "test1234");
SSLSocketFactory factory = (SSLSocketFactory)SSLSocketFactory.getDefault();
SSLSocket socket = (SSLSocket) factory.createSocket("localhost", 5514);
System.out.println("Handshaking...");
socket.startHandshake();
PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
boolean checkError = printWriter.checkError();
printWriter.println(message);
}
private static void nonSSL() throws Exception {
Socket socket = new Socket("localhost", 5514);
PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
printWriter.println(message);
}
}
Спасибо,
RK