Ошибка API Kafka Connect при отправке значения по умолчанию STRUCT с JsonConvertor и AvroConvertor - PullRequest
0 голосов
/ 14 января 2020

вот код:

            SchemaBuilder schemaBuilder = SchemaBuilder.struct()
                    .field("province", SchemaBuilder.STRING_SCHEMA)
                    .field("city", SchemaBuilder.STRING_SCHEMA);

            Struct defaultValue = new Struct(schemaBuilder)
                    .put("province", "aaa")
                    .put("city", "aaaa");

            Schema addressSchema = schemaBuilder.defaultValue(defaultValue).build();

            Schema dataSchema = SchemaBuilder.struct().name("personMessage")
                    .field("address", addressSchema).build();
            Struct normalValue = new Struct(addressSchema)
                    .put("province", "bbb")
                    .put("city", "bbbb");


            Struct struct = new Struct(dataSchema).put("address", normalValue);

            BsonTimestamp timestamp = new BsonTimestamp();
            records.add(new SourceRecord(offsetKey(replicaSetName), offsetValue(timestamp.getValue()),
                    topic, struct.schema(), struct));

Я использую JsonConvertor и получаю ошибку:

Caused by: org.apache.kafka.connect.errors.DataException: Mismatching schema.
    at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:711)
    at org.apache.kafka.connect.json.JsonConverter.asJsonSchema(JsonConverter.java:458)
    at org.apache.kafka.connect.json.JsonConverter.asJsonSchema(JsonConverter.java:431)

относительный код

                    Struct struct = (Struct) value;
                    if (!struct.schema().equals(schema))
                        throw new DataException("Mismatching schema.");
                    ObjectNode obj = JsonNodeFactory.instance.objectNode();
                    for (Field field : schema.fields()) {
                        obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
                    }
                    return obj;

над кодом будет вызываться по следующему коду:

                jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME);
                ArrayNode fields = JsonNodeFactory.instance.arrayNode();
                for (Field field : schema.fields()) {
                    ObjectNode fieldJsonSchema = asJsonSchema(field.schema()).deepCopy();
                    fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name());
                    fields.add(fieldJsonSchema);
                }
                jsonSchema.set(JsonSchema.STRUCT_FIELDS_FIELD_NAME, fields);

я обнаружил, что struct.schema () - это SchemaBuilder, а схема - это Schema {Struct}, когда field = "address".

Таким образом, я могу получить результат установить значение по умолчанию STRUCT для записи, но я не могу отправить запись ...

...