настройка ReplyingKafkaTemplate для нескольких сущностей - PullRequest
0 голосов
/ 26 мая 2020

У меня 2 сервиса (обе пружинные на kotlin). Названы "клиент" и "сервер". Из-за некоторых ограничений я должен использовать синхронный шаблон запроса-ответа с kafka. Поэтому я пытаюсь использовать ReplyingKafkaTemplate. Моя проблема в том, что мне нужно использовать для нескольких объектов. Означает создание нескольких шаблонов ReplyingKafkaTemplate, один для «FOO», второй для «BAR». Поэтому в моем коде я создаю несколько классов KafkaConfig с настройкой каждого объекта и одной базовой конфигурацией.

Также я исключаю KafkaAutoConfig из загрузки. Конфигурация ниже для "серверной" стороны (без фиксации шаблона kafka):

@Configuration
@EnableKafka
class KafkaConfig @Autowired constructor(
    @Value("\${kafka.bootstrap-servers}")
    private var bootstrapServers: String,
    @Value("\${kafka.consumer-group.name}")
    private var consumerGroup: String,
    @Value("\${kafka.consumer-group.id}")
    private var groupId: Number
) {

    @Bean("kafkaProducerConfig")
    fun producerConfigs(): MutableMap<String, Any> {
        return mutableMapOf(
            Pair(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
            Pair(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
            Pair(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
            Pair(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"),
            Pair(ProducerConfig.ACKS_CONFIG, "all"),
            Pair(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"),
            Pair(ProducerConfig.RETRIES_CONFIG, Int.MAX_VALUE.toString()),
            Pair(ProducerConfig.LINGER_MS_CONFIG, "20"),
            Pair(ProducerConfig.BATCH_SIZE_CONFIG, (32 * 1024).toString()),
            Pair(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
        )
    }

    @Bean("kafkaConsumerConfig")
    fun consumerConfigs(): Map<String, Any> {
        return mutableMapOf(
            Pair(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
            Pair(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
            Pair(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
            Pair(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
            Pair(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup + groupId)
        )
    }

    @Bean("kafkaAdminConfig")
    fun admin(): KafkaAdmin {
        val configs: MutableMap<String, Any> = HashMap()
        configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
        return KafkaAdmin(configs)
    }
}

и одна для FOO (BAR такая же конфигурация, что и foo вместо других сущностей, и имя bean-компонента):

@Configuration
class KafkaConfigForFOO {

    @Value("\${kafka.topic.request-consumable-topic}")
    private lateinit var requestConsumableTopic: String

    @Value("\${kafka.request-reply.timeout-ms}")
    private lateinit var replyTimeout: Number

    @Bean("requestFOOTopicConfig")
    fun requestConsumableTopic(): NewTopic {
        val configs: MutableMap<String, String> = HashMap()
        configs["retention.ms"] = replyTimeout.toString()
        return NewTopic(requestConsumableTopic, 6, 3.toShort()).configs(configs)
    }

    @Bean("producerFactoryForFOO")
    @Autowired
    fun producerFactoryForFOO(@Qualifier("kafkaProducerConfig") producerConfigs: MutableMap<String, Any>):
        ProducerFactory<String, FOO> = DefaultKafkaProducerFactory(producerConfigs)

    @Bean("kafkaTemplateForFOO")
    @Autowired
    fun kafkaTemplateForFOO(@Qualifier("producerFactoryForFOO") producerFactory: ProducerFactory<String, FOO>):
        KafkaTemplate<String, FOO> = KafkaTemplate(producerFactory)

    @Bean("consumerFactoryForFOO")
    @Autowired
    fun consumerFactoryForFOO(@Qualifier("kafkaConsumerConfig") consumerConfigs: MutableMap<String, Any>):
        ConsumerFactory<String, FOO> = DefaultKafkaConsumerFactory(consumerConfigs, StringDeserializer(), JsonDeserializer(FOO::class.java))

    @Bean("kafkaListenerContainerFactoryForFOO")
    @Autowired
    fun kafkaListenerContainerFactoryForFOO(
        @Qualifier("consumerFactoryForFOO") consumerFactory: ConsumerFactory<String, FOO>,
        @Qualifier("kafkaTemplateForFOO") kafkaTemplate: KafkaTemplate<String, FOO>
    ):
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, FOO>> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, FOO>()
        factory.consumerFactory = consumerFactory
        factory.setReplyTemplate(kafkaTemplate)
        return factory
    }
}

А также класс обслуживания с прослушивателем kafka:

@Component
class FOOReplyingKafkaConsumer @Autowired constructor(
    private val fooService: FooService
) {
    @KafkaListener(topics = ["\${kafka.topic.request-FOO-topic}"], containerFactory = "kafkaListenerContainerFactoryForFoo", groupId = "\${spring.kafka.consumer.group-id}")
    @SendTo()
    fun cropListen(request: FOO): FOO{
        return FOO(fooService.getAllByIds(request.ids ?: mutableSetOf()).toMutableSet())
    }
}

Проблема в том, что если я удалю имя bean-компонента из общей пружины конфигурации потребителя / производителя, создавая циклические зависимости между kafkaTEmpalte и продюсеромFactory. Когда я возвращаю его обратно, spring не видит от него реквизит kafka и выкидывает Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers из-за невозможности разобрать bootstrap -серверы.

Также, если я удаляю исключение KafkaAutoConfiguration, генерируется исключение, которое ContainerListener не может знать, что вводит конфигурацию ведьмы

С простым java приложением Spring kafka с одним шаблоном все хорошо.

Ответы [ 2 ]

1 голос
/ 26 мая 2020

Вам не нужны две фабрики потребителей; стирание типа означает, что оно не имеет значения во время выполнения.

Boot конфигурирует один как

ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(

, что фактически равно <Object, Object> (или <Any, Any> в Kotlin).

Для обоих слушателей может использоваться одна и та же фабрика.

Поскольку вы используете десериализатор JSON; тип определяется из заголовков, установленных на отправляющей стороне.

То же самое с шаблоном.

public KafkaTemplate<?, ?> kafkaTemplate

Его можно вводить несколько раз с разными типами c или как <Object, Object>.

0 голосов
/ 27 мая 2020

Спасибо Гэри Расселу за правильное направление. Я удаляю все конфиги, кроме конфигурации производителя и kafkaTemplate. Автоматически подключитесь к конфигурации весенней загрузки по умолчанию kafkaTemplate с нужным типом.

...