Как я могу пометить сообщения в нифи? - PullRequest
0 голосов
/ 25 января 2019

Отказ от ответственности: я абсолютно ничего не знаю о nifi.

Мне нужно получать сообщения от процессора ListenHTTP, а затем преобразовывать каждое сообщение в сообщение json с меткой времени.

Итак, скажем, я получаю сообщение hello world в 5 часов утра. Это должно преобразовать это в {"timestamp": "5 am", "message":"hello world"}.

Как мне это сделать?

Ответы [ 2 ]

0 голосов
/ 25 января 2019

Каждый потоковый файл имеет атрибутов , которые представляют собой фрагменты метаданных, хранящихся в парах ключ / значение в памяти (доступных для быстрого чтения / записи).Когда происходит какая-либо операция, фрагменты метаданных записываются средой NiFi как для событий Происхождение , связанных с потоковым файлом, так и иногда с самим потоковым файлом.Например, если ListenHTTP является первым процессором в потоке, любой файл потока, который входит в поток, будет иметь атрибут entryDate со значением времени, когда он возник в формате Thu Jan 24 15:53:52 PST 2019.Вы можете читать и записывать эти атрибуты на различных процессорах (например, UpdateAttribute, RouteOnAttribute и т. Д.).

В вашем случае вы можете использовать процессор ReplaceText, следующий сразу за процессором ListenHTTP со значением поиска (?s)(^.*$) (весь контент потокового файла или «то, что вы получили через вызов HTTP»)и восстановительная стоимость {"timestamp_now":"${now():format('YYYY-MM-dd HH:mm:ss.SSS Z')}", "timestamp_ed": "${entryDate:format('YYYY-MM-dd HH:mm:ss.SSS Z')}", "message":"$1"}.

В приведенном выше примере предусмотрены две опции:

  1. * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

Эти два значения могут незначительно отличаться в зависимости от производительности / очереди / и т. д.В моем простом примере они были на расстоянии 2 миллисекунды.Вы можете отформатировать их, используя метод format() и обычный синтаксис формата времени Java , так что вы можете получить «5 утра», например, с помощью h a (полный пример: now():format('h a'):toLower()).

Пример

  • ListenHTTP работает на порту 9999 с путем contentListener
  • ReplaceText как указано выше
  • LogAttribute с полезной нагрузкой журнала true

NiFi flow on canvas and terminal showing log and curl command

Команда скручивания: curl -d "helloworld" -X POST http://localhost:9999/contentListener

Пример вывода:

2019-01-24 16:04:44,529 INFO [Timer-Driven Process Thread-6] o.a.n.processors.standard.LogAttribute LogAttribute[id=8246b0a0-0168-1000-7254-2c2e43d136a7] logging for flow file StandardFlowFileRecord[uuid=5e1c6d12-298d-4d9c-9fcb-108c208580fa,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1548374015429-1, container=default, section=1], offset=3424, length=122],offset=0,name=5e1c6d12-298d-4d9c-9fcb-108c208580fa,size=122]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
    Value: 'Thu Jan 24 16:04:44 PST 2019'
Key: 'lineageStartDate'
    Value: 'Thu Jan 24 16:04:44 PST 2019'
Key: 'fileSize'
    Value: '122'
FlowFile Attribute Map Content
Key: 'filename'
    Value: '5e1c6d12-298d-4d9c-9fcb-108c208580fa'
Key: 'path'
    Value: './'
Key: 'restlistener.remote.source.host'
    Value: '127.0.0.1'
Key: 'restlistener.remote.user.dn'
    Value: 'none'
Key: 'restlistener.request.uri'
    Value: '/contentListener'
Key: 'uuid'
    Value: '5e1c6d12-298d-4d9c-9fcb-108c208580fa'
--------------------------------------------------
{"timestamp_now":"2019-01-24 16:04:44.518 -0800", "timestamp_ed": "2019-01-24 16:04:44.516 -0800", "message":"helloworld"}
0 голосов
/ 25 января 2019

Итак, я добавил ExecuteScript процессор с этим кодом:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import java.time.LocalDateTime

flowFile = session.get()
if(!flowFile)return
def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  // Do something with text here
} as InputStreamCallback)


def outputMessage = '{\"timestamp\":\"' + LocalDateTime.now().toString() + '\", \"message:\":\"' + text + '\"}'

flowFile = session.write(flowFile, {inputStream, outputStream ->
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  outputStream.write(outputMessage.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

и это сработало.

...