Spring kafka с ssl, ошибка производителя "Duplicate key" - PullRequest
0 голосов
/ 29 мая 2020

У меня есть сервер Kafka с SSL. При отправке ему сообщений возникает ошибка:

2020-05-29 19:19:27.677 [kafka-producer-network-thread | producer-1] ERROR o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread: 
java.lang.IllegalStateException: Duplicate key host:9091 (id: 1 rack: null)
    at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
    at java.util.HashMap.merge(HashMap.java:1254)
    at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)

Я не понимаю, что подразумевается под "Дубликатом ключа". Моя конфигурация в application.properties:

spring.kafka.bootstrap-servers=host:9091
spring.kafka.properties.security.protocol=SSL
spring.kafka.ssl.trust-store-password=kafka
spring.kafka.properties.ssl.endpoint.identification.algorithm=
spring.kafka.ssl.key-store-password=abc
spring.kafka.ssl.trust-store-location=file:/path/client.truststore.jks
spring.kafka.ssl.key-store-location=file:/path/client.keystore.jks

Частичная конфигурация производителя Kafka из журналов во время выполнения:

    acks = 1
    bootstrap.servers = [host:9091]
    client.dns.lookup = default
    client.id = 
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    security.protocol = SSL
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = 
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = /path/client.keystore.jks
    ssl.keystore.password = [hidden]
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = /path/client.truststore.jks
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Что-то не так? Любая помощь будет принята с благодарностью.

Мне кажется, что версии могут быть несовместимы в соответствии с таблицей в этом документе . Мои версии:

spring-boot: 2.2.6.RELEASE
spring-kafka: 2.3.7.RELEASE
Kafka server: 0.10.1.1

Однако я смог успешно производить и использовать с сервера Kafka (0.10.0.0) без SSL, используя текущую версию.

...