Сгенерированный образец потока из утилиты ksql-datagen из следующей схемы -
{
"type": "record",
"name": "users",
**"namespace": "com.example",**
"fields": [
{
"name": "registertime",
"type": {
"type":"long",
"arg.properties":{
"range":{"min":1487715775521,"max":1519273364600}
}
}
},
{
"name": "userid",
"type": {
"type":"string",
"arg.properties":{"regex":"User_[1-9][0-2]"}
}
},
{
"name": "regionid",
"type": {
"type":"string",
"arg.properties":{"regex":"Region_[1-9]"}
}
},
{
"name": "gender",
"type": {
"type":"string",
"arg.properties":{
"options":["MALE","FEMALE","OTHER"]
}
}
}
]}
при проверке на наличие версий он все еще выбирает схему "io.confluent.ksql.avro_schemas" -
curl "http://localhost:8081/subjects/test-user-value/versions/1"
{"subject": "test-user-value", "version": 1, "id": 4, "schema": "{" type ":" record "," name ": "KsqlDataSourceSchema", "имен": "io.confluent.ksql.avro_schemas" , "полей": [{ "имя": "registertime", "тип": [ "нуль", "длинный"]," по умолчанию ": нулевая}, {" имя ":" USERID " "типа": [ "нуль", "строка"], "по умолчанию": нулевая}, { "имя": "RegionId","Тип ": [" нуль», "строка"], "по умолчанию": нулевой}, { "имя": "пол", "тип": [ "нуль", "строка"], "по умолчанию": нулевой}]} "}
Получена следующая ошибка при попытке использования с API-интерфейсом Kafka-streams -
Исключение в потоке" PageView-Users-Stream-Join-eg-1dc610a3-c9d9-4c1e-b5eb-910e4bc74826-StreamThread-1 "org.apache.kafka.streams.errors.StreamsException: обработчик исключения десериализации настроен как сбой при ошибке десериализации.Если вы предпочитаете, чтобы потоковый конвейер продолжался после ошибки десериализации, установите соответствующий параметр default.deserialization.exception.handler.в org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize (RecordDeserializer.java:80) в org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp (RecordQueue.jap.60).kafka.streams.processor.internals.RecordQueue.addRawRecords (RecordQueue.java:101) в org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords (PartitionGroup.java:124) в org.apachekaf..processor.internals.StreamTask.addRecords (StreamTask.java:711) в org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks (StreamThread.java:995) в org.apache.kafka.streamternalsprocessin..StreamThread.runOnce (StreamThread.java:833) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:777) в org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) Причина: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора 4, вызванная: org.apache.kafka.common.errors.SerializationException: Не удалось найти класс io.confluent.ksql.avro_schemas.KsqlDataSourceSchema, указанный в схеме писателя, при поиске схемы считывателя для SpecificRecord.