У меня есть тема kafka, которая создается из binlog базы данных MySQL.Он сериализуется в формате avro, и когда я пытаюсь десериализовать потребление из этой темы, я получаю следующее исключение:
ERROR org.apache.kafka.streams.errors.LogAndFailExceptionHandler Exception caught during Deserialization, taskId: 0_1, topic: mysql.db.couriers, partition: 1, offset: 14163362
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1541
Caused by: org.apache.avro.AvroTypeException: Found long, expecting union
Это мой файл avro:
{
"type": "record",
"name": "Envelope",
"namespace": "mysql.db.couriers",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "user_id",
"type": "long"
},
{
"name": "is_vip",
"type": [
{
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"null"
],
"default": 0
},
{
"name": "has_box",
"type": [
{
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"null"
],
"default": 0
},
{
"name": "is_priority",
"type": [
{
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"null"
],
"default": 0
},
{
"name": "tier",
"type": ["null", "string"], "default": null
},
{
"name": "tiered_at",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
{
"name": "tiered_till",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
{
"name": "annual_turnover",
"type": ["null", "long"], "default": null
},
{
"name": "total_turnover",
"type": ["null", "long"], "default": null
},
{
"name": "loyalty_score",
"type": ["null", "long"], "default": null
},
{
"name": "total_score",
"type": ["null", "long"], "default": null
},
{
"name": "notifications",
"type": ["null", "string"], "default": null
},
{
"name": "gender",
"type": ["null", "string"], "default": null
},
{
"name": "birthday",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
{
"name": "fathername",
"type": ["null", "string"], "default": null
},
{
"name": "married",
"type": [
"null",
{
"type": "int",
"connect.default": 0,
"connect.type": "int16"
}
],
"default": 0
},
{
"name": "military",
"type": [
"null",
{
"type": "int",
"connect.default": 0,
"connect.type": "int16"
}
],
"default": 0
},
{
"name": "nationalNumber",
"type": ["null", "string"], "default": null
},
{
"name": "nationalcardnumber",
"type": ["null", "string"], "default": null
},
{
"name": "phone",
"type": ["null", "string"], "default": null
},
{
"name": "mobile",
"type": ["null", "string"], "default": null
},
{
"name": "address",
"type": ["null", "string"], "default": null
},
{
"name": "workingtime",
"type": ["null", "string"], "default": null
},
{
"name": "workingType",
"type": ["null", "string"], "default": null
},
{
"name": "start_hour",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTime"
}
],
"default": null
},
{
"name": "end_hour",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTime"
}
],
"default": null
},
{
"name": "second_start_hour",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTime"
}
],
"default": null
},
{
"name": "second_end_hour",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTime"
}
],
"default": null
},
{
"name": "description",
"type": ["null", "string"], "default": null
},
{
"name": "left",
"type": [
{
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"null"
],
"default": 0
},
{
"name": "verified_at",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
{
"name": "plate_number",
"type": ["null", "string"], "default": null
},
{
"name": "alternative_phone",
"type": ["null", "string"], "default": null
},
{
"name": "score",
"type": ["null", "int"], "default": null
},
{
"name": "ban_id",
"type": ["null", "long"], "default": null
},
{
"name": "banned_at",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
{
"name": "banned_till",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
{
"name": "invitation_card",
"type": [
{
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"null"
],
"default": 0
},
{
"name": "received_invitation_transaction",
"type": [
{
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"null"
],
"default": 0
},
{
"name": "transport_types",
"type": ["null", "string"], "default": null
},
{
"name": "last_filled_customer_id",
"type": ["null", "long"], "default": null
},
{
"name": "created_at",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
{
"name": "updated_at",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
{
"name": "deleted_at",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
{
"name": "document_number",
"type": ["null", "string"], "default": null
},
{
"name": "address_lat",
"type": ["null", "double"], "default": null
},
{
"name": "address_lng",
"type": ["null", "double"], "default": null
},
{
"name": "traffic_control",
"type": [
{
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"null"
],
"default": 0
},
{
"name": "credit",
"type": ["null", "long"], "default": null
},
{
"name": "found_us",
"type": ["null", "string"], "default": null
},
{
"name": "credit_autopay",
"type": ["null", "long"], "default": null
}
],
"connect.name": "mysql.db.couriers.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
},
{
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.mysql",
"fields": [
{
"name": "version",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "name",
"type": "string"
},
{
"name": "server_id",
"type": "long"
},
{
"name": "ts_sec",
"type": "long"
},
{
"name": "gtid",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "file",
"type": "string"
},
{
"name": "pos",
"type": "long"
},
{
"name": "row",
"type": "int"
},
{
"name": "snapshot",
"type": [
{
"type": "boolean",
"connect.default": false
},
"null"
],
"default": false
},
{
"name": "thread",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "db",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "table",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "query",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "com.db.avro.io.debezium.connector.mysql.Source"
}
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": [
"null",
"long"
],
"default": null
}
],
"connect.name": "mysql.db.couriers.Envelope"
}
У меня есть другая таблицав основном похож на одно и другое приложение Kafka, которое делает то же самое успешно.Мне интересно, связана ли моя проблема с моим файлом avro, или это может быть только одна проблемная строка в базе данных.И если последнее, то есть ли в Confluent API способ пропустить такие строки вместо того, чтобы генерировать исключение, которое останавливает все мое приложение?