Я немного озадачен тем, как добавить конфиги ACL в Kafka Producer в Scala / Java. В этих документах нет ничего о добавлении конфигурации ACL в код.
https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html
Мне удалось создать алиса производителя и потребителя из консоль и могу создавать сообщения и видеть их при запуске потребителя, как показано ниже.
bin/sasl-kafka-console-consumer-alice.sh --bootstrap-server localhost:9092 --topic acls-test-topic --consumer.config config/sasl-consumer-alice.properties --from-beginning
Есть ли способ добавить эти конфиги / команды в Scala продюсер?
edit: Ниже приведены журналы, которые я получаю, закомментированы ли у меня строки jaas или нет или нет
11:52:43.396 [kafka-producer-network-thread | producer-9] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at 00.000.000.00:9092.
11:52:43.397 [kafka-producer-network-thread | producer-9] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -1
11:52:43.448 [kafka-producer-network-thread | producer-9] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node -1
11:52:43.498 [kafka-producer-network-thread | producer-9] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -1
11:52:43.498 [kafka-producer-network-thread | producer-9] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, payload=nu
ll, request=RequestSend(header={api_key=3,api_version=0,correlation_id=29,client_id=producer-9}, body={topics=[acls-test-topic]})) to node -1
11:52:43.548 [kafka-producer-network-thread | producer-9] WARN o.a.kafka.common.network.Selector - Error in I/O with /00.000.000.00
java.io.EOFException: null
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) ~[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:248) ~[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]
11:52:43.550 [kafka-producer-network-thread | producer-9] DEBUG o.apache.kafka.clients.NetworkClient - Node -1 disconnected.
11:52:43.551 [kafka-producer-network-thread | producer-9] DEBUG o.apache.kafka.clients.NetworkClient - Give up sending metadata request since no node is available
11:52:43.650 [kafka-producer-network-thread | producer-9] DEBUG o.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -1
11:52:43.650 [kafka-producer-network-thread | producer-9] DEBUG o.apache.kafka.clients.NetworkClient - Init connection to node -1 for sending metadata request in the next iteration
И настройка производителя:
def createNewKafkaProducer(brokerList: String,
acks: Int = -1,
metadataFetchTimeout: Long = 3000L,
blockOnBufferFull: Boolean = true,
bufferSize: Long = 1024L * 1024L,
retries: Int = 0): NewKafkaProducer[String, Array[Byte]] = {
// val jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
// val jaasCfg = String.format(jaasTemplate, "alice", "alice")
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
// producerProps.put("security.protocol", "SASL_SSL")
// producerProps.put("sasl.mechanism", "SCRAM-SHA-256")
// producerProps.put("sasl.jaas.config", jaasCfg)
new NewKafkaProducer[String, Array[Byte]](producerProps)