Я пытаюсь создать собственную политику для тем Kafka. Я обнаружил, что конструктор и метод configure()
вызываются при запуске брокера.
Но метод validate()
не вызывается при создании новой темы с помощью инструмента kafka-topics.sh .
import org.apache.kafka.common.errors.*;
import org.apache.kafka.server.policy.*;
import java.util.*;
public class policy implements CreateTopicPolicy
{
public policy()
{
System.out.println("came to policy constructor");
}
public void configure(Map<String,?> map){
System.out.println("came to configure in policy");
}
public void close(){
System.out.println("closing");
}
public void validate(RequestMetadata requestMetadata) throws PolicyViolationException
{
System.out.println("Came to validate in policy");
throw new PolicyViolationException("policy violated");
}
}
Есть ли какая-либо конфигурация, кроме этой?
create.topic.policy.class.name
В журналах брокера я вижу сообщения ..
came to policy constructor
came to configure in policy
Но не Came to validate in policy
при создании новой темы.
Обновление: Следующий ответ работает для инструмента kafka-topics.sh, но не работает для автоматически созданных тем.
Есть ли способ применить проверку автоматически создаваемых тем?
(В настоящее время я установил для автоматического создания темы значение false)