Как сериализовать метку времени в поле Json в процессоре NiFi ValidateRecord / JsonRecordSetWriter - PullRequest
0 голосов
/ 18 октября 2018

Как сериализовать метку времени в поле Json в процессоре NiFi ValidateRecord / JsonRecordSetWriter.

На входе у меня есть файл CSV со столбцом меток времени в формате yyyy-MM-dd HH:mm:ss.SSS.В моем NiFi Flow у меня есть ValidateRecord процессор, который использует CSVReader для чтения и JsonRecordSetWriter для записи.Оба они используют схему Avro с полем отметки времени, определенным как

"fields" : [ {
    "name" : "timestamp",
    "type" : {
            "type" : "long",
            "logicalType" : "timestamp-millis"
        },
    "doc" : "Type inferred from '2016/10/08 07:51:00.000'"
    }, {
    ...

Когда поступает запись со значением поля, например 2016-10-08 07:51:00.000, я получаю исключение в журналах NiFi:

2018-10-18 17:05:59,135 ERROR [Timer-Driven Process Thread-1] o.a.n.processors.standard.ValidateRecord ValidateRecord[id=3d44915d-a52a-3eb0-1ae1-7b0cbe4b1a03] Failed to write MapRecord[{timestamp=2016-10-08 07:51:00.0, ...  ] with schema {"type":"record","name":"redfunnel","doc":"Schema generated by Kite","fields":[{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-millis"},"doc":"Type inferred from '2016/10/08 07:51:00.000'"},{ .... }]} as a JSON Object due to java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp): java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
    at org.codehaus.jackson.impl.JsonGeneratorBase._writeSimpleObject(JsonGeneratorBase.java:556)
    at org.codehaus.jackson.impl.JsonGeneratorBase.writeObject(JsonGeneratorBase.java:317)
    at org.apache.nifi.json.WriteJsonResult.writeRawValue(WriteJsonResult.java:267)
    at org.apache.nifi.json.WriteJsonResult.writeRecord(WriteJsonResult.java:201)
    at org.apache.nifi.json.WriteJsonResult.writeRawRecord(WriteJsonResult.java:149)
    at org.apache.nifi.processors.standard.ValidateRecord.onTrigger(ValidateRecord.java:342)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

В свойствах моего JsonRecordSetWriter я пытаюсь указать формат для записи метки времени как yyyy-MM-dd HH:mm:ss.SSS

enter image description here

но, к сожалению, безуспешно, я все еще получаю то же исключение в журналах NiFi.

Означает ли это, что JsonRecordSetWriter не может сериализовать java.time.Timestamp по умолчанию, даже если у него есть свойство Timestamp Format для настройки на первый взглядименно это?

Можно ли записать метку времени в соответствии с пользовательским форматом, используя готовые компоненты NiFi, или мне нужно изменить обновление JsonRecordSetWriter?

После выполнения кода мое исключение выдается из этой ветви кода .Похоже, что это ветка для недействительных записей, которые не прошли проверку.Может быть, моя ошибка возникает только на недействительных записях.

1 Ответ

0 голосов
/ 19 октября 2018

Кажется, я нашел конфигурацию, которая работает в моем случае.

Мне пришлось разделить схему на две части: одну для ввода и другую для вывода.

Итак schema1 определяет поле метки времени как:

{
    "name" : "timestamp",
    "type" : "string",
    "doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}

и schema2 определяет поле метки времени как

{
    "name" : "timestamp",
    "type" : {
        "type" : "long",
        "logicalType" : "timestamp-millis"
    },
    "doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}

Теперь я настраиваюValidateRecord процессор с

  • CSVReader, который использует schema1
  • JsonRecordSetWriter, который использует schema2
  • ValidateRecord's "«Схема текста» с schema1

После этого записи без ошибок проходят мой процессор ValidateRecord и попадают в поле timestamp базы данных Postgres с использованием процессора PutDatabaseRecordкоторый использует JsonTreeReader, настроенный с schema2 .

Также важно настроить свойство формата отметки времени JsonTreeReader с правильным форматом строки, например, 'yyyy-MM-dd HH: mm: ss.SSS 'в моем случае.

Надеюсь, это поможет в аналогичномобучение там.

...