java .lang.OutOfMemory Ошибка при попытке реализовать SSL / TLS в Kafka на Windows - PullRequest
1 голос
/ 18 июня 2020

Я пытаюсь реализовать шифрование и аутентификацию с использованием SSL / TLS в Kafka на Windows. Я создал ключи и сертификаты TLS в соответствии с этой ссылкой и установил следующие свойства в server.properties :

listeners=SSL://0.0.0.0:9092
advertised.listeners=SSL://<host-name>:9092 *
ssl.keystore.location=C:/DataGrid/Kafka/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=C:/DataGrid/Kafka/kafka.server.truststore.jks
ssl.truststore.password=test1234
security.inter.broker.protocol=SSL
ssl.client.auth=required
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:<host-name>
security.protocol=SSL

Но при попытке создать topi c с помощью следующей команды в cmd:

kafka-topics.bat --create --bootstrap-server <host-name>:9092 --replication-factor 1 --partitions 1 --topic test

Я получаю эту ошибку:

C:\DataGrid\Kafka\bin\windows>kafka-topics.bat --create --bootstrap-server vmnala:9092 --replication-factor 1 --partitions 1 --topic test
[2020-06-17 11:52:49,629] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
        at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)
        at java.lang.Thread.run(Thread.java:748)

Глядя на server.log, я вижу следующую ошибку:

[2020-06-17 11:52:49,567] INFO [SocketServer brokerId=1] Failed authentication with /10.2.9.200 (SSL handshake failed) (org.apache.kafka.common.network.Selector)

Кроме того, при попытке для создания потребителя в python я получаю:

>>> from kafka import KafkaConsumer
>>> c = KafkaConsumer(bootstrap_servers='vmnala:9092', security_protocol = "SSL")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Python\lib\site-packages\kafka\consumer\group.py", line 324, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "C:\Python\lib\site-packages\kafka\client_async.py", line 221, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "C:\Python\lib\site-packages\kafka\client_async.py", line 826, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
>>>

Есть идеи, что может быть причиной этого?

Большое спасибо!

1 Ответ

1 голос
/ 18 июня 2020

Это происходит потому, что вы настроили своих брокеров на использование SSL, но не настраиваете клиентов для этого. По умолчанию клиенты используют PLAINTEXT.

Для kafka-topics вам необходимо передать ему файл через --command-config, который содержит необходимые свойства. Что-то вроде:

ssl.truststore.location=<PATH>
ssl.truststore.password=<PASSWORD>
security.protocol=SSL

Для клиента Python нужно указать соответствующие конфиги в конструкторе. См. https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka .KafkaConsumer для списка конфигураций

...