У меня есть API, использующий Джексон для сериализации / десериализации, и все работает отлично. Я попытался переключиться на kotlinx и имел некоторые ошибки. Как работает мой API: у меня есть библиотека, которую я могу использовать для использования во всех моих API, это абстракция для потребителей и производителей Kafka. Все начинается, когда API получает запрос http. Давайте использовать конечную точку
/api/client
в качестве примера. Получение следующего JSON
{
"id" : "uuid-here",
"name" : "Name",
"phones: [
{
"id" : "uuid-here",
"number" : "+1 589 9652 5522"
}
]
}
В моем KTOR Orchestration API я объявлю, что хочу, чтобы Джексон сериализовал /
install(ContentNegotiation) {
jackson {
registerModule(KotlinModule())
dateFormat = DateFormat.getDateInstance(3)
enable(SerializationFeature.INDENT_OUTPUT)
}
}
И у меня есть мои модели:
data class Client(
val id: UUID,
val name: String,
val phones : List<Phone>
)
data class Phone (
val id: UUID
val number: String
)
Итак, я получу JSON запросов в API контроллера Orchestration
fun Route.client(service: ClientService) {
route("/api/client") {
post("/") {
val client = call.receive<Client>()
call.respond(HttpStatusCode.Created, service.sendCommand(client)
}
}
}
Пока все хорошо. Теперь в моей библиотеке я создам Kafka Producer
fun producer(
bootstrapServers: String,
): KafkaProducer<JsonNode, JsonNode> {
val prop: HashMap<String, Any> = HashMap()
prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
prop[KEY_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java.name // my key will be json
prop[VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java.name // my value will be json
return KafkaProducer(prop)
}
Теперь я создам функцию для абстрагирования создания производителей.
data class Event (
val id: UUID
val status: String
val message: Any
)
suspend fun producerCommand(
topicName: String,
id: UUID,
status: Status // its an Enum,
message: Any,
bootstrapServer: String
) {
val producer = producer(bootstrapServers)
val record = ProducerRecord<JsonNode, JsonNode>(topicName, valueToTree(id), valueToTree(Event(id, status, message)))
coroutineScope { launch { producer.dispatch(record) } }
}
Моя функция нуждается в классе событий, потому что она не отправляет клиента напрямую в Kafka topi c. Он отправляет событие, которое может иметь клиента (или другие классы)
valueToTree - это функция для преобразования объекта в JsonNode
fun valueToTree(obj: Any): JsonNode? {
try {
return jacksonObjectMapper().valueToTree(obj)
} catch (ex: JsonMappingException) {
ex.printStackTrace()
}
return null
}
. Обратно в мой API я создам службу для вызова Команде производителя
class ClientService {
suspend fun sendCommand(client: Client) {
producerCommand(
"insert-client", client.id, Status.Open, client, "localhost:9092"
)
}
}
Все работает отлично. Теперь, если я изменю сериализацию Джексона для kotlinx:
install(ContentNegotiation) {
serialization(
contentType = ContentType.Application.Json,
json = Json(
DefaultJsonConfiguration.copy(
prettyPrint = true
)
)
)
}
Изменение моделей:
@Serializable
data class Client(
@Serializable(UUIDSerializer::class)
val id: UUID,
val name: String,
val phones : List<Phone>
)
@Serializable
data class Phone (
@Serializable(UUIDSerializer::class)
val id: UUID
val number: String
)
Изменение Kafka Producer
fun producer(
bootstrapServers: String,
schemaUrl: String
): KafkaProducer<String, GenericRecord> {
val prop: HashMap<String, Any> = HashMap()
prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
prop[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
prop[VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
prop[SCHEMA_REGISTRY_URL_CONFIG] = schemaUrl
return KafkaProducer(prop)
}
Теперь мой ключ будет строкой , и моим значением будет схема Avro.
Изменение моей функции originCommand
@Serializable
data class Event(
@Serializable(UUIDSerializer::class)
val id: UUID,
val status: Status,
@ContextualSerialization
val message: Any
)
suspend fun producerCommand(
topicName: String,
id: UUID,
status: Status,
message: Any,
bootstrapServers: String,
schemaUrl: String
) {
val producer = producer(
bootstrapServers,
schemaUrl)
Avro.default.schema(Event.serializer())
val avroSchema = Avro.default.toRecord(Event.serializer(), Event(id, status, message))
val record = ProducerRecord<String, GenericRecord>(topicName, id.toString(), avroSchema)
coroutineScope { launch { producer.dispatch(record) } }
}
И, наконец, обновление моей службы
class ClientService {
suspend fun sendCommand(client: Client) {
producerCommand(
"insert-client", UUID.randomUUID(), Status.Open, client, "localhost:9092",
"http://localhost:8081"
)
}
}
Теперь, если я попытаюсь запустить API и потребляют JSON запрос Я получаю ошибку
java .lang.IndexOutOfBoundsException: Индекс 0 выходит за пределы для длины 0