Apache NiFi / Cassandra - PutCassandraRecord не удалось преобразовать в объект записи - PullRequest
0 голосов
/ 14 мая 2019

Я пытаюсь использовать процессор PutCassandraRecord от NiFi, чтобы вставить некоторые записи JSON в базу данных Cassandra. Я пытаюсь вставить тип метки времени в Cassandra, но NiFi жалуется на NumberFormatException для входной строки "2019-02-02T08: 00: 00.000"

Тип данных кассандры для указанного поля отметки времени (ts отметка времени) Я использую схему Avro с: {"name": "ts", "type": {"type": "long", "logicType": "timestamp-millis"}}

{
  "name": "app.records",
  "type": "record",
  "fields": [
    { "name": "ts", "type": {"type": "long", "logicalType": "timestamp-millis"}},
    { "name": "app_name", "type": "string" },

Журналы NiFi показывают, что он может анализировать объект JSON, но не может преобразовать его в запись ...

2019-05-13 21:13:04,036 ERROR [Timer-Driven Process Thread-2] o.a.n.p.cassandra.PutCassandraRecord PutCassandraRecord[id=ecb33d77-cc4a-17f5-23a8-e002e1777a1c] Unable to write the records into Cassandra table due to org.apache.nifi.serialization.MalformedRecordException: Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema: org.apache.nifi.serialization.MalformedRecordException: Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema
org.apache.nifi.serialization.MalformedRecordException: Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema
        at org.apache.nifi.json.AbstractJsonRowRecordReader.nextRecord(AbstractJsonRowRecordReader.java:98)
        at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
        at org.apache.nifi.processors.cassandra.PutCassandraRecord.onTrigger(PutCassandraRecord.java:151)
        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
        at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
        at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "2019-02-02T08:00:35.473"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at org.apache.nifi.serialization.record.util.DataTypeUtils.toTimestamp(DataTypeUtils.java:1057)
        at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:156)
        at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:120)
        at org.apache.nifi.json.JsonTreeRowRecordReader.convertField(JsonTreeRowRecordReader.java:170)
        at org.apache.nifi.json.JsonTreeRowRecordReader.convertJsonNodeToRecord(JsonTreeRowRecordReader.java:137)
        at org.apache.nifi.json.JsonTreeRowRecordReader.convertJsonNodeToRecord(JsonTreeRowRecordReader.java:83)
        at org.apache.nifi.json.JsonTreeRowRecordReader.convertJsonNodeToRecord(JsonTreeRowRecordReader.java:74)
        at org.apache.nifi.json.AbstractJsonRowRecordReader.nextRecord(AbstractJsonRowRecordReader.java:93)
        ... 14 common frames omitted

Типы, кажется, все правильно. Любая помощь будет оценена.

Ответы [ 2 ]

0 голосов
/ 15 мая 2019

Я закончил преобразование даты и времени в метку времени эпохи, преобразовал ее в миллисекунды и преобразовал ее в long, чтобы она могла работать с моей схемой Avro.

ts = datetime.datetime.strptime(strippedTime, '%Y-%m-%d %H:%M:%S.%f')
epoch = datetime.datetime(1970,1,1)
timestamp = long((ts-epoch).total_seconds()*1000)
fields['ts'] = timestamp
0 голосов
/ 14 мая 2019

Проблема в том, что вы пытаетесь вставить поле метки времени без указания формата даты. соответствующий код выглядит следующим образом:

Если входные данные являются строкой, попробуйте получить для нее строку форматирования, а затем, если строка форматирования является допустимым форматером, тогда получите датуиспользуй это.Если либо строка формата не указана, либо она недопустима, то NiFi пытается преобразовать ее, используя Long.parseLong.

. Вам необходимо либо выполнить явное приведение соответствующего поля с помощью чего-то вроде этого:

toDate("yyyy-MM-dd'T'hh:mm:ss")
...