Kafka Custom AuthenticateCallbackHandler - PullRequest
0 голосов
/ 18 мая 2018

Я пытался реализовать AuthenticateCallbackHandler, готовый к выпуску в Kafka 2.0.0, но безрезультатно - это установка, в которой он должен работать?

Вкл. https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers Я прочитал:

Использование внешнего сервера аутентификации для аутентификации SASL / PLAIN с использованием реализации SaslServer для PLAIN, включенной в Kafka

Определение нового класса, который реализует AuthenticateCallbackHandler, который обрабатывает NameCallback и PlainAuthenticateCallbackи добавьте класс к свойству брокера sasl.server.callback.handler.class.Для посредника будет создан один экземпляр этого обработчика обратного вызова.Настроенный обработчик обратного вызова отвечает за проверку пароля, предоставленного клиентами, и он может использовать внешний сервер аутентификации.

Итак, в основном я создал класс:

package com.example;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;

public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
    private List<AppConfigurationEntry> jaasConfigEntries;

    public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        this.jaasConfigEntries = jaasConfigEntries;
    }

    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        String username = null;
        for (Callback callback: callbacks) {
            if (callback instanceof NameCallback)
                username = ((NameCallback) callback).getDefaultName();
            else if (callback instanceof PlainAuthenticateCallback) {
                PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
                boolean authenticated = authenticate(username, plainCallback.password());
                plainCallback.authenticated(authenticated);
            } else
                throw new UnsupportedCallbackException(callback);
        }
    }

    protected boolean authenticate(String username, char[] password) throws IOException {
        return username != null && username.equals("test") && new String(password).equals("test");
    }

    public void close() throws KafkaException {
    }
}

Создайте флягу и сделайте ее доступной для Кафки, как в этом docker-compose.yml:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0-beta1-1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SASL_ENABLED: "false"

  kafka:
    image: confluentinc/cp-kafka:5.0.0-beta1-1
    depends_on:
      - zookeeper
    volumes:
      - ./security:/etc/kafka/secrets
      - ./jars:/etc/kafka/jars
    ports:
      - "9092:9092"
    environment:
      CLASSPATH: /etc/kafka/jars/*
      ZOOKEEPER_SASL_ENABLED: "false"
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
      KAFKA_SUPER_USERS: User:admin
      KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf
      KAFKA_SASL_SERVER_CALLBACK_HANDLER_CLASS: com.example.CustomAuthenticateCallbackHandler

broker_jaas.conf:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  ;
};

cli-client.properties:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="test" \
  password="test";

Затем я проверяю его, используя:

 > kafka-console-producer --broker-list kafka:9092 --topic test-topic --producer.config /etc/kafka/secrets/cli-client.properties
 This is a message
 This is another message

Однако я получаю сообщение об ошибке, что производитель не может аутентифицировать:

[2018-05-17 19:49: 06,955] ОШИБКА [Producer clientId = console -roduction] Подключение к узлу -1 не выполнено при аутентификации из-за: Ошибка аутентификации: Неверное имя пользователя или пароль (org.apache.kafka.clients.NetworkClient)

1 Ответ

0 голосов
/ 01 июня 2018

Я получил следующий ответ из списка рассылки Kafka, попробую на следующей неделе:

Эта функция будет частью грядущего выпуска Kafka 2.0.0.

Документация здесь: https://github.com/apache/kafka/pull/4890

конфиги здесь: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java#L57

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...