Я следил за этим сообщением в блоге, чтобы реализовать встроенный sasl_ssl
https://sharebigdata.wordpress.com/2018/01/21/implementing-sasl-plain/
@SpringBootTest
@RunWith(SpringRunner.class)
@TestPropertySource(properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.group-id=notify-integration-test-group-id",
"spring.kafka.consumer.auto-offset-reset=earliest"
})
public class ListenerIntegrationTest2 {
static final String INBOUND = "inbound-topic";
static final String OUTBOUND = "outbound-topic";
static {
System.setProperty("java.security.auth.login.config", "src/test/java/configs/kafka/kafka_jaas.conf");
}
@ClassRule
public static final EmbeddedKafkaRule KAFKA = new EmbeddedKafkaRule(1, true, 1,
ListenerIntegrationTest2.INBOUND, ListenerIntegrationTest2.OUTBOUND)
.brokerProperty("listeners", "SASL_SSL://localhost:9092, PLAINTEXT://localhost:9093")
.brokerProperty("ssl.keystore.location", "src/test/java/configs/kafka/kafka.broker1.keystore.jks")
.brokerProperty("ssl.keystore.password", "pass")
.brokerProperty("ssl.key.password", "pass")
.brokerProperty("ssl.client.auth", "required")
.brokerProperty("ssl.truststore.location", "src/test/java/configs/kafka/kafka.broker1.truststore.jks")
.brokerProperty("ssl.truststore.password", "pass")
.brokerProperty("security.inter.broker.protocol", "SASL_SSL")
.brokerProperty("sasl.enabled.mechanisms", "PLAIN,SASL_SSL")
.brokerProperty("sasl.mechanism.inter.broker.protocol", "SASL_SSL");
Когда я использую конфигурацию PLAINTEXT: // localhost: 9093, я получаю следующее:
WARN org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Connection to node 0 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
Однако, когда я удаляю его, я получаю org.apache.kafka.common.KafkaException: Tried to check server's port before server was started or checked for port of non-existing protocol
Я попытался изменить тип SecurityProtocol на автообнаружение, какой стиль взаимодействия с брокером он должен использовать (он жестко задан в виде открытого текста - это, вероятно, должно быть исправлено):
if (this.kafkaPorts[i] == 0) {
this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProperties.forName(this.brokerProperties.getOrDefault("security.protocol", SecurityProtocol.PLAINTEXT).toString()); // or whatever property can give me the security protocol I should be using to communicate
}
Я все еще получаю следующую ошибку: WARN org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Connection to node 0 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
Есть ли способ правильно настроить встроенную кафку для включения sasl_ssl?