Исключение десериализации с помощью утилиты ksql-datagen, сгенерированной данными - PullRequest
0 голосов
/ 04 января 2019

Сгенерированный образец потока из утилиты ksql-datagen из следующей схемы -

{
        "type": "record",
        "name": "users",
        **"namespace": "com.example",**
        "fields": [
        {
        "name": "registertime",
        "type": {
            "type":"long",
            "arg.properties":{
                "range":{"min":1487715775521,"max":1519273364600}
                }
        }
        },
        {
                "name": "userid",
                "type": {
            "type":"string",
            "arg.properties":{"regex":"User_[1-9][0-2]"}
        }
        },
        {
                "name": "regionid",
                "type": {
            "type":"string",
            "arg.properties":{"regex":"Region_[1-9]"}
        }
        },
        {
                "name": "gender",
                "type": {
            "type":"string",
            "arg.properties":{
            "options":["MALE","FEMALE","OTHER"]
            }
        }
        }
]}

при проверке на наличие версий он все еще выбирает схему "io.confluent.ksql.avro_schemas" -

curl "http://localhost:8081/subjects/test-user-value/versions/1"

{"subject": "test-user-value", "version": 1, "id": 4, "schema": "{" type ":" record "," name ": "KsqlDataSourceSchema", "имен": "io.confluent.ksql.avro_schemas" , "полей": [{ "имя": "registertime", "тип": [ "нуль", "длинный"]," по умолчанию ": нулевая}, {" имя ":" USERID " "типа": [ "нуль", "строка"], "по умолчанию": нулевая}, { "имя": "RegionId","Тип ": [" нуль», "строка"], "по умолчанию": нулевой}, { "имя": "пол", "тип": [ "нуль", "строка"], "по умолчанию": нулевой}]} "}

Получена следующая ошибка при попытке использования с API-интерфейсом Kafka-streams -

Исключение в потоке" PageView-Users-Stream-Join-eg-1dc610a3-c9d9-4c1e-b5eb-910e4bc74826-StreamThread-1 "org.apache.kafka.streams.errors.StreamsException: обработчик исключения десериализации настроен как сбой при ошибке десериализации.Если вы предпочитаете, чтобы потоковый конвейер продолжался после ошибки десериализации, установите соответствующий параметр default.deserialization.exception.handler.в org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize (RecordDeserializer.java:80) в org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp (RecordQueue.jap.60).kafka.streams.processor.internals.RecordQueue.addRawRecords (RecordQueue.java:101) в org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords (PartitionGroup.java:124) в org.apachekaf..processor.internals.StreamTask.addRecords (StreamTask.java:711) в org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks (StreamThread.java:995) в org.apache.kafka.streamternalsprocessin..StreamThread.runOnce (StreamThread.java:833) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:777) в org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) Причина: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора 4, вызванная: org.apache.kafka.common.errors.SerializationException: Не удалось найти класс io.confluent.ksql.avro_schemas.KsqlDataSourceSchema, указанный в схеме писателя, при поиске схемы считывателя для SpecificRecord.

1 Ответ

0 голосов
/ 09 января 2019

Отвечено на https://github.com/confluentinc/schema-registry/issues/980

Datagen всегда определяет пространство имен как io.confluent.ksql.avro_schemas.См. confluentinc / ksql # 1906

Теперь есть другие способы для генерации тестовых данных также в Kafka.

...