Следующее поле вызывает сбой моего приложения Kafka Stream при попытке создать сообщение.
{
"name" : "TS",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
},
"doc" : "date time indicating the timestamp of the position (in UTC)"
}
Ниже приведена трассировка стека, когда Stream пытается записать сообщение в тему вывода
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type org.joda.time.DateTime: 2018-02-12T06:14:37.000+05:00
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:772)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:302)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:737)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
Serdes, использованный для создания сообщения, создается как
final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081/");
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values