Добавление пользовательской аутентификации jar в kafka - PullRequest
0 голосов
/ 29 октября 2019

В настоящее время я использую PlainLoginModule для аутентификации пользователей. Однако теперь я создал jar с кодом, указанным здесь, и хочу использовать его вместо PlainLoginModule: https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers#KIP-86:ConfigurableSASLcallbackhandlers-sample_plainSampleCallbackHandlerforSASL/PLAIN.

Я поместил файл jar в папку ~ / libs и добавил listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.synopsys.demo.DemoApplication

для моего server.properties и моего kafka_server_jaas.conf в:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret" 
    user_admin="admin-secret";
};

И когда я запускаю свой сервер, я получаю ошибку:

Часть 1:

14:36:41.924 [main] DEBUG org.apache.kafka.common.network.Selector - [KafkaServer id=1] Successfully authenticated with swe-analyticsdb-prod2/10.15.164.233
14:36:41.924 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Successfully authenticated with swe-analyticsdb-prod2/10.15.164.233
14:36:41.924 [Controller-1-to-broker-1-send-thread] INFO kafka.controller.RequestSendThread - [RequestSendThread controllerId=1] Controller 1 connected to swe-analyticsdb-prod2:9093 (id: 1 rack: null) for sending state change requests
14:36:41.925 [data-plane-kafka-network-thread-1-ListenerName(SASL_SSL)-SASL_SSL-1] DEBUG org.apache.kafka.common.network.Selector - [SocketServer brokerId=1] Connection with swe-analyticsdb-prod2.internal.synopsys.com/10.15.164.233 disconnected
java.io.EOFException: null
>---at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
>---at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)
>---at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>---at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>---at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>---at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>---at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>---at kafka.network.Processor.poll(SocketServer.scala:830)
>---at kafka.network.Processor.run(SocketServer.scala:730)
>---at java.lang.Thread.run(Thread.java:748)
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-reauthentication:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication-no-reauth:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-authentication:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-reauthentication:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name reauthentication-latency:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:
14:36:41.928 [main] WARN kafka.utils.CoreUtils$ - org.apache.kafka.common.requests.ControlledShutdownRequest$Builder.<init>(IJS)V
java.lang.NoSuchMethodError: org.apache.kafka.common.requests.ControlledShutdownRequest$Builder.<init>(IJS)V
>---at kafka.server.KafkaServer.doControlledShutdown$1(KafkaServer.scala:520)
>---at kafka.server.KafkaServer.controlledShutdown(KafkaServer.scala:563)
>---at kafka.server.KafkaServer.$anonfun$shutdown$2(KafkaServer.scala:585)
>---at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86)
>---at kafka.server.KafkaServer.shutdown(KafkaServer.scala:585)
>---at kafka.server.KafkaServer.startup(KafkaServer.scala:342)
>---at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
>---at kafka.Kafka$.main(Kafka.scala:75)
>---at kafka.Kafka.main(Kafka.scala)
14:36:41.929 [main] INFO kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread - [/config/changes-event-process-thread]: Shutting down
14:36:41.929 [/config/changes-event-process-thread] INFO kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread - [/config/changes-event-process-thread]: Stopped
14:36:41.929 [main] INFO kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread - [/config/changes-event-process-thread]: Shutdown completed
14:36:41.930 [main] INFO kafka.network.SocketServer - [SocketServer brokerId=1] Stopping socket server request processors
14:36:41.931 [data-plane-kafka-socket-acceptor-ListenerName(SASL_SSL)-SASL_SSL-9093] DEBUG kafka.network.Acceptor - Closing server socket and selector.
14:36:41.933 [data-plane-kafka-network-thread-1-ListenerName(SASL_SSL)-SASL_SSL-0] DEBUG kafka.network.Processor - Closing selector - processor 0
14:36:41.934 [data-plane-kafka-network-thread-1-ListenerName(SASL_SSL)-SASL_SSL-0] DEBUG kafka.network.Processor - Closing selector connection 10.15.164.233:9093-10.15.164.233:44774-0
14:36:41.935 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Connection with swe-analyticsdb-prod2/10.15.164.233 disconnected

Часть 2:

07:22:21.223 [main] DEBUG kafka.utils.KafkaScheduler - Shutting down task scheduler.
07:22:21.223 [main] INFO kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper - [ExpirationReaper-1-Heartbeat]: Shutting down
07:22:21.254 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Initiating connection to node swe-analyticsdb-prod2:9093 (id: 1 rack: null) using address swe-analyticsdb-prod2/10.15.164.233
07:22:21.254 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to SEND_APIVERSIONS_REQUEST
07:22:21.254 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Creating SaslClient: client=null;service=kafka;serviceHostname=swe-analyticsdb-prod2;mechs=[PLAIN]
07:22:21.255 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Connection with swe-analyticsdb-prod2/10.15.164.233 disconnected
java.net.ConnectException: Connection refused
>---at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>---at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>---at org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:119)
>---at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
>---at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
>---at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>---at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
>---at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
>---at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:282)
>---at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:236)
>---at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
07:22:21.255 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Node 1 disconnected.
07:22:21.255 [Controller-1-to-broker-1-send-thread] WARN org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Connection to node 1 (swe-analyticsdb-prod2/10.15.164.233:9093) could not be established. Broker may not be available.
07:22:21.256 [Controller-1-to-broker-1-send-thread] WARN kafka.controller.RequestSendThread - [RequestSendThread controllerId=1] Controller 1's connection to broker swe-analyticsdb-prod2:9093 (id: 1 rack: null) was unsuccessful
java.io.IOException: Connection to swe-analyticsdb-prod2:9093 (id: 1 rack: null) failed.
>---at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>---at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:282)
>---at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:236)
>---at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
07:22:21.356 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Initiating connection to node swe-analyticsdb-prod2:9093 (id: 1 rack: null) using address swe-analyticsdb-prod2/10.15.164.233
07:22:21.356 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to SEND_APIVERSIONS_REQUEST
07:22:21.356 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Creating SaslClient: client=null;service=kafka;serviceHostname=swe-analyticsdb-prod2;mechs=[PLAIN]
07:22:21.357 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Connection with swe-analyticsdb-prod2/10.15.164.233 disconnected
java.net.ConnectException: Connection refused
>---at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>---at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>---at org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:119)
>---at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
>---at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
>---at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>---at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
>---at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
>---at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:282)
>---at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:236)
>---at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
07:22:21.357 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Node 1 disconnected.
07:22:21.357 [Controller-1-to-broker-1-send-thread] WARN org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Connection to node 1 (swe-analyticsdb-prod2/10.15.164.233:9093) could not be established. Broker may not be available.

ОБНОВЛЕНИЕ Я замечаю, что такое поведение происходит, даже если я не использую созданный мною jar / класс, а просто оставляю его внутриКаталог "../libs". Вышеуказанная ошибка всегда будет возникать с использованием встроенных или пользовательских классов AuthenticateCallBackHandler.

Я пропускаю шаг / шаги? Я знаю, что должен добавить банку в Kafka, чтобы он мог ее распознать и использовать, но я не вижу никаких учебных пособий / документации, объясняющих, как использовать пользовательский обработчик обратного вызова с PLAIN. Кто-нибудь знает, как это сделать?

Я использую Kafka 2.2

Мой пользовательский код класса:

import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
import kafka.common.KafkaException;

import javax.naming.AuthenticationNotSupportedException;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;


public class CustomCallback implements AuthenticateCallbackHandler {
    @Override
    public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {

    }
    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        String username = null;
        for (Callback callback: callbacks) {
            if (callback instanceof NameCallback)
                username = ((NameCallback) callback).getDefaultName();
            else if (callback instanceof PlainAuthenticateCallback) {
                PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
                boolean authenticated = authenticate(username, plainCallback.password());
                plainCallback.authenticated(authenticated);
            } else
                throw new UnsupportedCallbackException(callback);
        }
    }
    protected boolean authenticate(String username, char[] password) throws IOException {
        if (username == null)
            return false;
        else {
            // Return true if password matches expected password
            Hashtable<String, String> environment = new Hashtable<String, String>();
            System.out.println("Custom class is being called");
            environment.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
            environment.put(Context.PROVIDER_URL, "ldap://adldap.internal.synopsys.com:389");
            environment.put(Context.SECURITY_AUTHENTICATION, "simple");
            environment.put(Context.SECURITY_PRINCIPAL, "CN=" + username+",CN=Users,DC=internal,DC=synopsys,DC=com");
            environment.put(Context.SECURITY_CREDENTIALS, new String(password));

            try
            {
                DirContext context = new InitialDirContext(environment);
                context.getEnvironment();
                context.close();
                return true;
            }
            catch (AuthenticationNotSupportedException exception)
            {
                System.out.println("The authentication is not supported by the server");
                return false;
            }

            catch (AuthenticationException exception)
            {
                System.out.println("Incorrect password or username");
                return false;
            }

            catch (NamingException exception)
            {
                System.out.println("Error when trying to create the context");
                return false;
            }
        }

    }
    @Override
    public void close() throws KafkaException {
    }


    public static void main(String[] args) throws IOException {
        char[] pass = new char[]{'P', '0', 'm', 'e', 'l', '0', '2', '0', '1', '9', '!'};
        CustomCallback test = new CustomCallback();
        System.out.println(test.authenticate("<username>",pass));
        System.out.println(test.getClass().getName());
        //SpringApplication.run(DemoApplication.class, args);
    }
}

server.properties содержимое:

advertised.listeners=SASL_SSL://<machine name>:9093
ssl.endpoint.identification.algorithm=HTTPS
ssl.client.auth=required
ssl.truststore.location=/remote/sde108/kafka/kafka/SSL2/client/server.truststore.jks
ssl.truststore.password=password
ssl.keystore.location=/remote/sde108/kafka/kafka/SSL2/client/server.keystore.jks
ssl.keystore.password=password
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

zookeeper.set.acl=false
listeners=SASL_SSL://<machine name>:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
offsets.retention.minutes=1
#listener.name.sasl_sasl.plain.sasl.server.callback.handler.class=<package name>.CustomCallbackApplication

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>

http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0

<groupId>synopsys</groupId>
<artifactId>synopsys</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>6</source>
                <target>6</target>
            </configuration>
        </plugin>
    </plugins>
</build>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
    </dependency>
</dependencies>

enter image description here метаинформация былавыполнено с использованием структуры проекта и артефактов сборки

1 Ответ

0 голосов
/ 31 октября 2019

Я думаю, что отвечу вам по электронной почте два дня назад. Я настраивал механизм аутентификации SASL / PLAIN, сохраняя имя пользователя / пароль в mysql вместо файла. Я также нахожу, что KIP-86 очень сбивает с толку, потому что он предоставляет разные способы сделать одно и то же и не рассказывать о различиях между ними.

Это то, что я делаю и что работает.

  1. Интерфейс, который я реализовал, - AuthenticateCallbackHandler

  2. Сгенерированный jar не должен быть помещен в ~ / libs. Существует подкаталог lib, в который вы устанавливаете Kafka.

  3. Я не изменял kafka_server_jaas.conf

...