Есть ли способ передать ключ и объект в качестве возвращаемого значения в Micronaut с Kafka - PullRequest
0 голосов
/ 26 марта 2020
@Topic("my-producer-topic")
@SendTo("my-consumer-topic")
fun receive(@KafkaKey key: String, value: String?): FpcPackProd {
    var objectMapper: ObjectMapper = ObjectMapper()

    var myClassJsonString = value.toString()
    var myClass: myClass = objectMapper.readValue(myClassJsonString,
            myClass::class.java)

    var logger: Logger = LoggerFactory.getLogger("Consumer Logger")
    logger.info("Consumed Data: {}", myClass)

    return myClass
}

companion object {
    private val logger = LoggerFactory.getLogger("Consumer Logger")
}}       data class myClass (var myData: String? = null, var mySecData: String? = null)

В настоящее время я только возвращаю класс, и я хотел бы вернуть класс данных и значение myData в качестве kafka, т.е. вернуть "sendKeyValue (myClass.myData, myClass)". Есть ли способ сделать это с помощью microronaut, kafka и kotlin. Благодаря.

1 Ответ

0 голосов
/ 14 апреля 2020

Невозможно указать ключ при отправке на топи c через @SendTo. Вы можете видеть в исходном коде, что он просто берет ключ из использованной записи https://github.com/micronaut-projects/micronaut-kafka/blob/319c7b1fe6758922559c3038a930ae88ba664e2d/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java#L623.

Вам необходимо создать клиент kafka, как показано ниже:

@KafkaClient
interface MyProducer {
    @Topic("my-consumer-topic")
    fun sendProduct(@KafkaKey key: String?, value: myClass?)
}

А затем введите его вашему потребителю:

@KafkaListener
class MyConsumer {

    @Inject
    lateinit var myProducer: MyProducer

    @Topic("my-producer-topic")
    fun receive(@KafkaKey key: String, value: String?) {
        var objectMapper: ObjectMapper = ObjectMapper()

        var myClassJsonString = value.toString()
        var myClass: myClass = objectMapper.readValue(myClassJsonString,
                myClass::class.java)

        var logger: Logger = LoggerFactory.getLogger("Consumer Logger")
        logger.info("Consumed Data: {}", myClass)

        myProducer.send(myClass.myData, myClass)
    }

    companion object {
        private val logger = LoggerFactory.getLogger("Consumer Logger")
    }
}
data class myClass (var myData: String? = null, var mySecData: String? = null)
...