Я пытаюсь реализовать шифрование и аутентификацию с использованием SSL / TLS в Kafka на Windows. Я создал ключи и сертификаты TLS в соответствии с этой ссылкой и установил следующие свойства в server.properties :
listeners=SSL://0.0.0.0:9092
advertised.listeners=SSL://<host-name>:9092 *
ssl.keystore.location=C:/DataGrid/Kafka/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=C:/DataGrid/Kafka/kafka.server.truststore.jks
ssl.truststore.password=test1234
security.inter.broker.protocol=SSL
ssl.client.auth=required
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:<host-name>
security.protocol=SSL
Но при попытке создать topi c с помощью следующей команды в cmd:
kafka-topics.bat --create --bootstrap-server <host-name>:9092 --replication-factor 1 --partitions 1 --topic test
Я получаю эту ошибку:
C:\DataGrid\Kafka\bin\windows>kafka-topics.bat --create --bootstrap-server vmnala:9092 --replication-factor 1 --partitions 1 --topic test
[2020-06-17 11:52:49,629] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)
at java.lang.Thread.run(Thread.java:748)
Глядя на server.log, я вижу следующую ошибку:
[2020-06-17 11:52:49,567] INFO [SocketServer brokerId=1] Failed authentication with /10.2.9.200 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
Кроме того, при попытке для создания потребителя в python я получаю:
>>> from kafka import KafkaConsumer
>>> c = KafkaConsumer(bootstrap_servers='vmnala:9092', security_protocol = "SSL")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Python\lib\site-packages\kafka\consumer\group.py", line 324, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "C:\Python\lib\site-packages\kafka\client_async.py", line 221, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "C:\Python\lib\site-packages\kafka\client_async.py", line 826, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
>>>
Есть идеи, что может быть причиной этого?
Большое спасибо!