SASL_SSL интеграция с EmbeddedKafka - PullRequest
0 голосов
/ 07 июня 2019

Я следил за этим сообщением в блоге, чтобы реализовать встроенный sasl_ssl https://sharebigdata.wordpress.com/2018/01/21/implementing-sasl-plain/

@SpringBootTest
@RunWith(SpringRunner.class)
@TestPropertySource(properties = {
        "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.group-id=notify-integration-test-group-id",
        "spring.kafka.consumer.auto-offset-reset=earliest"
})
public class ListenerIntegrationTest2 {
    static final String INBOUND = "inbound-topic";
    static final String OUTBOUND = "outbound-topic";

    static {
        System.setProperty("java.security.auth.login.config", "src/test/java/configs/kafka/kafka_jaas.conf");
    }

    @ClassRule
    public static final EmbeddedKafkaRule KAFKA = new EmbeddedKafkaRule(1, true, 1,
            ListenerIntegrationTest2.INBOUND, ListenerIntegrationTest2.OUTBOUND)
            .brokerProperty("listeners", "SASL_SSL://localhost:9092, PLAINTEXT://localhost:9093")
            .brokerProperty("ssl.keystore.location", "src/test/java/configs/kafka/kafka.broker1.keystore.jks")
            .brokerProperty("ssl.keystore.password", "pass")
            .brokerProperty("ssl.key.password", "pass")
            .brokerProperty("ssl.client.auth", "required")
            .brokerProperty("ssl.truststore.location", "src/test/java/configs/kafka/kafka.broker1.truststore.jks")
            .brokerProperty("ssl.truststore.password", "pass")
            .brokerProperty("security.inter.broker.protocol", "SASL_SSL")
            .brokerProperty("sasl.enabled.mechanisms", "PLAIN,SASL_SSL")
            .brokerProperty("sasl.mechanism.inter.broker.protocol", "SASL_SSL");

Когда я использую конфигурацию PLAINTEXT: // localhost: 9093, я получаю следующее: WARN org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Connection to node 0 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

Однако, когда я удаляю его, я получаю org.apache.kafka.common.KafkaException: Tried to check server's port before server was started or checked for port of non-existing protocol

Я попытался изменить тип SecurityProtocol на автообнаружение, какой стиль взаимодействия с брокером он должен использовать (он жестко задан в виде открытого текста - это, вероятно, должно быть исправлено):

   if (this.kafkaPorts[i] == 0) {
      this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProperties.forName(this.brokerProperties.getOrDefault("security.protocol", SecurityProtocol.PLAINTEXT).toString()); // or whatever property can give me the security protocol I should be using to communicate
   }

Я все еще получаю следующую ошибку: WARN org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Connection to node 0 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

Есть ли способ правильно настроить встроенную кафку для включения sasl_ssl?

...