У меня 2 сервиса (обе пружинные на kotlin). Названы "клиент" и "сервер". Из-за некоторых ограничений я должен использовать синхронный шаблон запроса-ответа с kafka. Поэтому я пытаюсь использовать ReplyingKafkaTemplate. Моя проблема в том, что мне нужно использовать для нескольких объектов. Означает создание нескольких шаблонов ReplyingKafkaTemplate, один для «FOO», второй для «BAR». Поэтому в моем коде я создаю несколько классов KafkaConfig с настройкой каждого объекта и одной базовой конфигурацией.
Также я исключаю KafkaAutoConfig из загрузки. Конфигурация ниже для "серверной" стороны (без фиксации шаблона kafka):
@Configuration
@EnableKafka
class KafkaConfig @Autowired constructor(
@Value("\${kafka.bootstrap-servers}")
private var bootstrapServers: String,
@Value("\${kafka.consumer-group.name}")
private var consumerGroup: String,
@Value("\${kafka.consumer-group.id}")
private var groupId: Number
) {
@Bean("kafkaProducerConfig")
fun producerConfigs(): MutableMap<String, Any> {
return mutableMapOf(
Pair(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
Pair(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
Pair(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
Pair(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"),
Pair(ProducerConfig.ACKS_CONFIG, "all"),
Pair(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"),
Pair(ProducerConfig.RETRIES_CONFIG, Int.MAX_VALUE.toString()),
Pair(ProducerConfig.LINGER_MS_CONFIG, "20"),
Pair(ProducerConfig.BATCH_SIZE_CONFIG, (32 * 1024).toString()),
Pair(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
)
}
@Bean("kafkaConsumerConfig")
fun consumerConfigs(): Map<String, Any> {
return mutableMapOf(
Pair(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
Pair(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
Pair(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
Pair(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
Pair(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup + groupId)
)
}
@Bean("kafkaAdminConfig")
fun admin(): KafkaAdmin {
val configs: MutableMap<String, Any> = HashMap()
configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
return KafkaAdmin(configs)
}
}
и одна для FOO (BAR такая же конфигурация, что и foo вместо других сущностей, и имя bean-компонента):
@Configuration
class KafkaConfigForFOO {
@Value("\${kafka.topic.request-consumable-topic}")
private lateinit var requestConsumableTopic: String
@Value("\${kafka.request-reply.timeout-ms}")
private lateinit var replyTimeout: Number
@Bean("requestFOOTopicConfig")
fun requestConsumableTopic(): NewTopic {
val configs: MutableMap<String, String> = HashMap()
configs["retention.ms"] = replyTimeout.toString()
return NewTopic(requestConsumableTopic, 6, 3.toShort()).configs(configs)
}
@Bean("producerFactoryForFOO")
@Autowired
fun producerFactoryForFOO(@Qualifier("kafkaProducerConfig") producerConfigs: MutableMap<String, Any>):
ProducerFactory<String, FOO> = DefaultKafkaProducerFactory(producerConfigs)
@Bean("kafkaTemplateForFOO")
@Autowired
fun kafkaTemplateForFOO(@Qualifier("producerFactoryForFOO") producerFactory: ProducerFactory<String, FOO>):
KafkaTemplate<String, FOO> = KafkaTemplate(producerFactory)
@Bean("consumerFactoryForFOO")
@Autowired
fun consumerFactoryForFOO(@Qualifier("kafkaConsumerConfig") consumerConfigs: MutableMap<String, Any>):
ConsumerFactory<String, FOO> = DefaultKafkaConsumerFactory(consumerConfigs, StringDeserializer(), JsonDeserializer(FOO::class.java))
@Bean("kafkaListenerContainerFactoryForFOO")
@Autowired
fun kafkaListenerContainerFactoryForFOO(
@Qualifier("consumerFactoryForFOO") consumerFactory: ConsumerFactory<String, FOO>,
@Qualifier("kafkaTemplateForFOO") kafkaTemplate: KafkaTemplate<String, FOO>
):
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, FOO>> {
val factory = ConcurrentKafkaListenerContainerFactory<String, FOO>()
factory.consumerFactory = consumerFactory
factory.setReplyTemplate(kafkaTemplate)
return factory
}
}
А также класс обслуживания с прослушивателем kafka:
@Component
class FOOReplyingKafkaConsumer @Autowired constructor(
private val fooService: FooService
) {
@KafkaListener(topics = ["\${kafka.topic.request-FOO-topic}"], containerFactory = "kafkaListenerContainerFactoryForFoo", groupId = "\${spring.kafka.consumer.group-id}")
@SendTo()
fun cropListen(request: FOO): FOO{
return FOO(fooService.getAllByIds(request.ids ?: mutableSetOf()).toMutableSet())
}
}
Проблема в том, что если я удалю имя bean-компонента из общей пружины конфигурации потребителя / производителя, создавая циклические зависимости между kafkaTEmpalte и продюсеромFactory. Когда я возвращаю его обратно, spring не видит от него реквизит kafka и выкидывает Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
из-за невозможности разобрать bootstrap -серверы.
Также, если я удаляю исключение KafkaAutoConfiguration, генерируется исключение, которое ContainerListener не может знать, что вводит конфигурацию ведьмы
С простым java приложением Spring kafka с одним шаблоном все хорошо.