Как сериализовать метку времени в поле Json в процессоре NiFi ValidateRecord
/ JsonRecordSetWriter
.
На входе у меня есть файл CSV со столбцом меток времени в формате yyyy-MM-dd HH:mm:ss.SSS
.В моем NiFi Flow у меня есть ValidateRecord
процессор, который использует CSVReader
для чтения и JsonRecordSetWriter
для записи.Оба они используют схему Avro с полем отметки времени, определенным как
"fields" : [ {
"name" : "timestamp",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
},
"doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}, {
...
Когда поступает запись со значением поля, например 2016-10-08 07:51:00.000
, я получаю исключение в журналах NiFi:
2018-10-18 17:05:59,135 ERROR [Timer-Driven Process Thread-1] o.a.n.processors.standard.ValidateRecord ValidateRecord[id=3d44915d-a52a-3eb0-1ae1-7b0cbe4b1a03] Failed to write MapRecord[{timestamp=2016-10-08 07:51:00.0, ... ] with schema {"type":"record","name":"redfunnel","doc":"Schema generated by Kite","fields":[{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-millis"},"doc":"Type inferred from '2016/10/08 07:51:00.000'"},{ .... }]} as a JSON Object due to java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp): java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
at org.codehaus.jackson.impl.JsonGeneratorBase._writeSimpleObject(JsonGeneratorBase.java:556)
at org.codehaus.jackson.impl.JsonGeneratorBase.writeObject(JsonGeneratorBase.java:317)
at org.apache.nifi.json.WriteJsonResult.writeRawValue(WriteJsonResult.java:267)
at org.apache.nifi.json.WriteJsonResult.writeRecord(WriteJsonResult.java:201)
at org.apache.nifi.json.WriteJsonResult.writeRawRecord(WriteJsonResult.java:149)
at org.apache.nifi.processors.standard.ValidateRecord.onTrigger(ValidateRecord.java:342)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
В свойствах моего JsonRecordSetWriter
я пытаюсь указать формат для записи метки времени как yyyy-MM-dd HH:mm:ss.SSS
но, к сожалению, безуспешно, я все еще получаю то же исключение в журналах NiFi.
Означает ли это, что JsonRecordSetWriter
не может сериализовать java.time.Timestamp
по умолчанию, даже если у него есть свойство Timestamp Format
для настройки на первый взглядименно это?
Можно ли записать метку времени в соответствии с пользовательским форматом, используя готовые компоненты NiFi, или мне нужно изменить обновление JsonRecordSetWriter
?
После выполнения кода мое исключение выдается из этой ветви кода .Похоже, что это ветка для недействительных записей, которые не прошли проверку.Может быть, моя ошибка возникает только на недействительных записях.