Загрузить вложение в Confluence с помощью Nifi ExecuteScript в Python - PullRequest
1 голос
/ 28 июня 2019

Я пытаюсь загрузить файл PDF в Confluence, используя процессор ExecuteScript от Nifi. Я могу загрузить файл успешно, но когда я загружаю и открываю его, он становится пустым. Должно быть, что-то не так с моим обращением. Может кто-нибудь, пожалуйста, помогите проверить?

Вот как я это делаю:

  1. скачать файл PDF из внутреннего API Nifi attributes after I download the file
  2. ExecuteScript Groovy - для преобразования содержимого потокового файла в атрибут
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()
if(!flowFile)return
def text = ''
session.read(flowFile, {inputStream ->
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)

flowFile = session?.putAttribute(flowFile, "file_content", text)
session.transfer(flowFile, /*ExecuteScript.*/ REL_SUCCESS)

attribute file_content 3. ExecuteScript Python - для загрузки PDF-файла в Confluence

Вот мой код для # 3. Я думаю, что здесь что-то не так ->

import json
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
from org.apache.nifi.processor.io import OutputStreamCallback

class OutputWrite(OutputStreamCallback):
  def __init__(self, obj):
  self.obj = obj
  def process(self, outputStream):
     outputStream.write(bytearray(json.dumps(self.obj).encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
  url = 'https://myconfluence.com/rest/api/content/12345/child/attachment'
  auth = 'myauthorization'
  file_name = 'mypdf.pdf'
  file_content = flowFile.getAttribute('file_content')

  s = requests.Session()

  m = MultipartEncoder(fields={'file': (file_name, file_content, 'application/pdf')})
  headers = {"X-Atlassian-Token":"nocheck", "Authorization":auth, "Content-Type":m.content_type}

  r = s.post(url, data=m, headers=headers, verify=False)

  session.write(flowFile, OutputWrite(json.loads(r.text)))
  session.transfer(flowFile, REL_SUCCESS)
  session.commit()

ОБНОВЛЕНИЕ 06/28/2019

Я решил последовать совету Питера и объединить коды 1 и 2. Это все еще не работает. Раньше размер файла PDF составлял 2 МБ, но он пустой. Теперь его размер составляет 0 КБ. Любая помощь будет принята с благодарностью!

import json
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
from org.apache.nifi.processor.io import OutputStreamCallback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback

class PyInputStreamCallback(InputStreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

class OutputWrite(OutputStreamCallback):
    def __init__(self, obj):
        self.obj = obj
    def process(self, outputStream):
        outputStream.write(bytearray(json.dumps(self.obj).encode('utf-8')))

text = ''
flowFile = session.get()
if(flowFile != None):
    session.read(flowFile, PyInputStreamCallback())
    confluence_attachment_api = flowFile.getAttribute('confluence_attachment_api')
    confluence_authorization = flowFile.getAttribute('confluence_authorization')
    file_name = flowFile.getAttribute('file_name')

    s = requests.Session()
    m = MultipartEncoder(fields={'file': (file_name, text, 'application/pdf')})
    headers = {"X-Atlassian-Token":"nocheck", "Authorization":confluence_authorization, "Content-Type":m.content_type}
    r = s.post(confluence_attachment_api, data=m, headers=headers, verify=False)

    session.write(flowFile, OutputWrite(json.loads(r.text)))
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

1 Ответ

1 голос
/ 28 июня 2019

Не похоже, что вы на самом деле отправляете содержимое FlowFile. Вместо этого вы отправляете атрибут с именем file_content в качестве содержимого файла, что, вероятно, не то, что вы намеревались

Вам нужно будет сделать session.read, чтобы получить поток файлов. Приведенный ниже код не работает как есть, но показывает, как вы можете получить доступ к потоку.

class PyInputStreamCallback(InputStreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream):
    m = MultipartEncoder(fields={'file': (file_name, inputStream, 'application/pdf')})

session.read(flowFile, PyInputStreamCallback())

Ссылка: https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html

...