Сделай абстракцию для потребителя кафки используя ktor и kotlin - PullRequest
0 голосов
/ 28 января 2020

Я создаю абстракцию для потребителя и производителя Kafka, чтобы постоянно избегать дублирования кода. Поэтому я создал библиотеку, используя kotlin и gradle, названную "kafka-commons", и поместил следующий код: Для производителя Kafka:

fun producer(
    bootstrapServers: String,
    idempotence: Boolean,
    acks: Acks,
    retries: Int,
    requestPerConnection: Int,
    compression: Compression,
    linger: Int,
    batchSize: BatchSize
): KafkaProducer<String, Any> {
    val prop: HashMap<String, Any> = HashMap()
    prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
    prop[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
    prop[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
    prop[ENABLE_IDEMPOTENCE_CONFIG] = idempotence
    prop[ACKS_CONFIG] = acks.value
    prop[RETRIES_CONFIG] = retries
    prop[MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION] = requestPerConnection
    prop[COMPRESSION_TYPE_CONFIG] = compression.value
    prop[LINGER_MS_CONFIG] = linger
    prop[BATCH_SIZE_CONFIG] = batchSize.value

    return KafkaProducer(prop)
}

suspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(record: ProducerRecord<K, V>) =
    suspendCoroutine<RecordMetadata> { continuation ->
        val callback = Callback { metadata, exception ->
            if (metadata == null) {
                continuation.resumeWithException(exception!!)
            } else {
                continuation.resume(metadata)
            }
        }
        this.send(record, callback)
    }

Итак, я создал объект Command по умолчанию со следующей структурой

data class Command(
    val id: UUID,
    val status: CommandStatus,
    val message: Any
)

id: создать уникальный статус ID: создать статус сообщения (может быть: Open / Processing / Closed / Error) message: объект из http запроса (например: если есть сообщение

Например: если есть «вставить пользователя: POST» с телом:

{"id": 1, "name": "John", "lastName": "Wick" }

поэтому сообщение будет этим объектом и т. Д.

И для создания этой команды я сделал эту функцию:

suspend fun creatCommand(
    topicName: String,
    id: UUID,
    commandStatus: CommandStatus,
    request: Any,
    bootstrapServers: String,
    idempotence: Boolean,
    acks: Acks,
    retries: Int,
    requestPerConnection: Int,
    compression: Compression,
    linger: Int,
    batchSize: BatchSize
): Unit {
    val producer = producer(
        bootstrapServers,
        idempotence,
        acks,
        retries,
        requestPerConnection,
        compression,
        linger,
        batchSize)

    val command = toCommand(processStarted(id, commandStatus, request))
    val record = ProducerRecord<String, Any>(topicName, id.toString(), command)
    coroutineScope { launch { producer.dispatch(record) } }
}

Итак, я создал другие API и просто вызвал эту функцию, чтобы создать продюсера, который посылает команду kafka. Например:

fun Route.user(service: Service) =
    route("/api/access") {
        post("/test") {
            call.respond(service.command(call.receive())) 
            }
}

>>>>>> other class <<<<<<<<

classService () {
    fun command( all parameters) { creatCommand(all parameters)} 
}

Слишком хорошо. Все отлично работает.

Теперь мои проблемы начинаются. Я пытаюсь создать потребителя. Сначала я сделал это:

fun consumer(
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int
): KafkaConsumer<String, Any> {
    val prop: HashMap<String, Any> = HashMap()
    prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
    prop[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
    prop[VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
    prop[GROUP_ID_CONFIG] = group
    prop[AUTO_OFFSET_RESET_CONFIG] = offsetBehaviour
    prop[ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
    prop[MAX_POLL_RECORDS_CONFIG] = pollMax

    return KafkaConsumer(prop)
}

И после:

fun<T> recordingCommand(
    command: Class<T>,
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int
) {
    val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
    consumer.subscribe(mutableListOf(topic))
    while (true) {
        val records = consumer.poll(Duration.ofMillis(100))
        for (record in records) {
            val om = ObjectMaper
            om.readvalue(record.value(), command::class.java)

            >>>> I GOT LOST HERE <<<<<<

        }
    }
}

Что мне нужно: создание абстра Потребитель ct, который записывает все данные внутри Command.message () (только сообщение) в базе данных.

Например, мне нужно записать пользователя выше (id 1, john wick) в postgresql база данных. Так что, если у меня есть служба с методом вставки, я могу вызвать ее, передавая метод вставки, например:

service.insert(recordingCommand(all parameters)).

Кто-нибудь может мне помочь с этим?

1 Ответ

1 голос
/ 28 января 2020

Если я правильно понял, у вас есть проблемы с отображением JSON в объекты

   val mapper = ObjectMaper
   while (true) {
        val records = consumer.poll(Duration.ofMillis(100))
        for (record in records) {
            val cmd = mapper.readvalue(record.value(), command::class.java)
            // do things with cmd
        }
    }

Примечание: у Kafka есть свой собственный десериализатор JSON для POJO, и если вы хотите отправить данные на База данных, Kafka Connect, как правило, более отказоустойчива, чем ваш простой l oop.

...