Я написал похожий обработчик, вдохновленный: https://medium.com/@jairsjunior/how-to-setup-oauth2-mechanism-to-a-kafka-broker-e42e72839fe
Что мне придется делать?Я определяю в server.properties следующие строки:
listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateLoginCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler
Файлы скомпилированы и перемещены в папку libs в / path / to/kafka/.
Все, что я получаю, является исключением:
org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.authenticator.AbstractLogin$LoginCallbackHandler
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.initialize(OAuthBearerLoginModule.java:278)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:736)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:53)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:82)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:114)
at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:76)
at kafka.network.Processor.<init>(SocketServer.scala:423)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:158)
at kafka.network.SocketServer$$anonfun$startup$1$$anonfun$apply$1.apply$mcVI$sp(SocketServer.scala:95)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:94)
at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.network.SocketServer.startup(SocketServer.scala:89)
at kafka.server.KafkaServer.startup(KafkaServer.scala:229)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
at kafka.Kafka$.main(Kafka.scala:92)
at kafka.Kafka.main(Kafka.scala)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:112)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:114)
at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:76)
at kafka.network.Processor.<init>(SocketServer.scala:423)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:158)
at kafka.network.SocketServer$$anonfun$startup$1$$anonfun$apply$1.apply$mcVI$sp(SocketServer.scala:95)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:94)
at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.network.SocketServer.startup(SocketServer.scala:89)
at kafka.server.KafkaServer.startup(KafkaServer.scala:229)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
at kafka.Kafka$.main(Kafka.scala:92)
at kafka.Kafka.main(Kafka.scala)
Caused by: javax.security.auth.login.LoginException: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.authenticator.AbstractLogin$LoginCallbackHandler
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.initialize(OAuthBearerLoginModule.java:278)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:736)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:53)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:82)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:114)
at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:76)
at kafka.network.Processor.<init>(SocketServer.scala:423)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:158)
at kafka.network.SocketServer$$anonfun$startup$1$$anonfun$apply$1.apply$mcVI$sp(SocketServer.scala:95)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:94)
at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.network.SocketServer.startup(SocketServer.scala:89)
at kafka.server.KafkaServer.startup(KafkaServer.scala:229)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
at kafka.Kafka$.main(Kafka.scala:92)
at kafka.Kafka.main(Kafka.scala)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:856)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:53)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:82)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)