У меня проблемы с созданием функции обратного вызова по следующему сценарию: потребитель kafka прослушивает новые сообщения для возможности записи в базу данных:
suspend fun consumerClient(service: ClientService) {
val messages = consumerCommands(
"create-client", "localhost:9092", "consumer-client", false,
OffsetBehaviour.Earliest, 10
)
}
suspend fun consumerCommands(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int
) {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(1000))
if (records.count() > 0) {
records.forEach {
val entity = treeToValue(it.value().get("message"), Client::class.java) as Client
ClientService().insert(entity)
}
}
}
}
Работает отлично. Но я пытаюсь создать что-то более общее c следующим образом:
interface KafkaConsumer<T> {
fun execute(callback: (T) -> Unit)
}
suspend fun <T> consumerCommand(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int,
callback: KafkaConsumer<T>
): ConsumerRecords<String, JsonNode>? {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(1000))
if (records.count() > 0) {
records.forEach {
val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
coroutineScope {
callback.execute { entity }
}
}
}
}
}
suspend fun consumerClient(service: ClientService) {
val messages = consumerCommand<Client>(
"create-client", "localhost:9092", "consumer-client", false,
OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
)
}
Но это не работает. Может ли кто-нибудь помочь?