Я новичок в Nifi и пытаюсь сделать POC в потоке ниже.
Я получаю сообщения XML из темы Какфа. Мне нужно, чтобы XML-сообщение получало несколько атрибутов и данных, представляющих собой сжатый формат GZIP из элементов XML, GZIP распаковывает данные (который снова является XML) и затем загружается в базу данных MySQL. Я пытаюсь это и застрял в следующем шаге.
(1) ConsumeKafka → (2) EvaluateXPath (flowfile-attribute = я установил несколько элементов XML в качестве атрибутов flowfile, что полезно в нисходящем направлении) → (3) EvaluateXPath (flowfile-content = получить данные gzip с использованием XPATH выражение = строка (// ABC / data) ) → (4) UpdateAttribute (mime.type = application / gzip) → (5) CompressContent (Формат сжатия = использовать атрибут mime.type и режим = распаковывать)
Мой CompressContent не работает с приведенным ниже исключением.
org.apache.nifi.processor.exception.ProcessException: IOException thrown from CompressContent[id=be4b9583-016e-1000-7cce-b9d822334c4c]: java.io.IOException: java.io.IOException: Input is not in the .gz format
Возможно, это связано с моим типом данных flowfile-contentиз (3) EvaluateXPath имеет значение String. Нужно ли преобразовывать строку в байт перед передачей в CompressContent? Если да, как я могу сделать это в той же (3) EvaluateXPath, используя какую-то функцию toBytes ()?
Заранее спасибо за вашу помощь !!!