Как создать функцию обратного вызова в Kotlin? - PullRequest
1 голос
/ 09 февраля 2020

У меня проблемы с созданием функции обратного вызова по следующему сценарию: потребитель kafka прослушивает новые сообщения для возможности записи в базу данных:

suspend fun consumerClient(service: ClientService) {
    val messages = consumerCommands(
        "create-client", "localhost:9092", "consumer-client", false,
        OffsetBehaviour.Earliest, 10
    )
}

suspend fun consumerCommands(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int
) {
    val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
    consumer.subscribe(mutableListOf(topic))
    while (true) {
        val records = consumer.poll(Duration.ofMillis(1000))
        if (records.count() > 0) {
            records.forEach {
                val entity = treeToValue(it.value().get("message"), Client::class.java) as Client
                ClientService().insert(entity)
            }
        }
    }
}

Работает отлично. Но я пытаюсь создать что-то более общее c следующим образом:

interface KafkaConsumer<T> {
    fun execute(callback: (T) -> Unit)
}

suspend fun <T> consumerCommand(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int,
    callback: KafkaConsumer<T>
): ConsumerRecords<String, JsonNode>? {
    val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
    consumer.subscribe(mutableListOf(topic))
    while (true) {
        val records = consumer.poll(Duration.ofMillis(1000))
        if (records.count() > 0) {
            records.forEach {
                val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
                coroutineScope {
                    callback.execute { entity }
                }
            }
        }
    }
}


suspend fun consumerClient(service: ClientService) {
    val messages = consumerCommand<Client>(
        "create-client", "localhost:9092", "consumer-client", false,
        OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
    )
}

Но это не работает. Может ли кто-нибудь помочь?

Ответы [ 2 ]

1 голос
/ 09 февраля 2020

Может быть, вы можете сделать что-то вроде этого:

suspend fun <T> consumerCommand(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int,
    callback: (entity: T) -> Unit>
): ConsumerRecords<String, JsonNode>? {
    val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
    consumer.subscribe(mutableListOf(topic))
    while (true) {
        val records = consumer.poll(Duration.ofMillis(1000))
        if (records.count() > 0) {
            records.forEach {
                val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
                callback(entity)
            }
        }
    }
}

suspend fun consumerClient(service: ClientService) {
    val messages = consumerCommand<Client>(
        "create-client", "localhost:9092", "consumer-client", false,
        OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
    )
}
1 голос
/ 09 февраля 2020

Вы пытаетесь использовать лямбду { client: Client -> ... }, где ожидается KafkaConsumer. Вместо этого вам нужно использовать тип функции. Эквивалентом вашего KafkaConsumer будет ((T) -> Unit) -> Unit, но я подозреваю, что это ошибка, и вы действительно хотите

suspend fun <T> consumerCommand(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int,
    callback: (T) -> Unit
): ConsumerRecords<String, JsonNode>? {
    ...
                coroutineScope {
                    callback(entity)
                }
            }
        }
    }
}

Или даже suspend (T) -> Unit.

Примечание: вы не не используйте аргумент service в вашей функции consumerClient, но вместо этого создайте новый ClientService; это намеренно?

...