Пользовательский процессор Nifi, показывающий ошибку «локальный переменный файл потока не может быть назначен» - PullRequest
0 голосов
/ 15 апреля 2020

Я пытаюсь отправить исходный файл потока в качестве ввода на следующий процессор, но в итоге получаю сообщение об ошибке, я очень плохо знаком с Nifi, а также имею некоторый опыт в Java.

public void onTrigger(ProessorContext context, ProcessSessionsession) throws ProceException{
   FlowFile flowfile = session.get();
   if(flowfile == null){
        return;
    }

    ArrayList<String> headData = new ArrayList<String>();
    try{
        session.read(flowfile, new InputStreamCallback(){

        final DBCPService= context.getProperty(CONNECTION_POOL).asContollerService(DBCPService.class);
        String query = " CREATE TABLE MODEL (";
        @SuppressWarnings("deprecation")
        public void process(InputStream inputStream) throws IOException {
        try{
            OPCPackage pkg = OPCPackage.open(inputStream);
            XSSFWorkbook workbook = new XSSFWorkbook(pkg);
            workbook.getAllNames();
            String dateheader = "date"
            XSSFSheet sheetName = workbook.getSheet(0);
            Row row = sheetName.getRow(0);
            for(Cell cell: row) {
                switch (cell.getCellType()){
                case NUMERIC:
                    if(HSSDataUtil.isCellDateFromated(cell)){
                        DataFormatter dataFromatter = new DataFormatter();
                        headData.add(dataFromatter.formatCellValue(cell);
                        query +=dataFromatter.formatCellValue(cell)+" " + "INT" ;
                    }else{
                        headData.add(String.valueOf(cell.getNumericCellValue()));
                    }
                    break;
                    case STRING:
                        headData.add(cell.getStringCellValue());
                        if(cell.getStringCellValue().toLowerCase().contains(dateheader))
                            query += cell.getStringCellValue() + " " + "TIMESTAMP,";
                        else
                            query +=cell.getStringCellValue() + " + "VARCHAR(50),";
                            break;
                    case BOOLEAN:
                        headData.add(String.valueOf(cell.getBooleanCellValue());
                        break;
                    default:
                        headData.add("");
                        break;
                        }
                    }
                    query = query.substring(0, query.length() -1);
                    query += ")";
                    workbook.close();
                    final Connection con = DBCPService.getConnection();
                    try{
                        java.sql.PreparedStatement = con.prepareStatement(query);
                        PreparedStatement.execute();
                        con.commit();
                        session.transfer(flowfile, REL_SUCCESS);
                    }catch (SQL Exception e){
                        e.printStackTrace();
                        session.transfer(flowfile, REL_FAILURE);
                    }
                }catch(InvalidFromatException ife){
                    getLogger().error(" only .xlsx excel files are supprted", ife);
                    thrownew UnsupportedOperationException("Only .xlsx OOXML files are substring", ife);
                }
            }

        });
        {catch (RuntimeException ex) {
            getLogger().error("Failed to process incoming Excel document. " + ex.getMessage(), ex);
            FlowFile failedFlowFile = session.putAttribute(flowfile, testxlsqlProcessor.class.getMessage());
        }
        final StringBuilder stringBuilder = new StringBuilder();
        flowfile = session.write(flowfile, new StreamCallback(){
        public void process(InputStream in, OutputStream out) throws IOException{
            stringBuilder.append(IOUtils.copy(in,out));
        }
    });

    }
}

Если я не добавляйте выходной поток, я получаю исключительное отношение переноса.

1 Ответ

0 голосов
/ 16 апреля 2020

Вы не захотите передавать файл потока изнутри InputStreamCallback, что должно произойти после того, как вы закончили чтение из файла потока. Если вы не изменяете содержимое файла исходящего потока, то в конце вам также не нужны вещи StreamCallback и IOUtils.copy (), вы можете просто передать исходный файл потока. Для случаев сбоя вы можете выбросить IOException, обертывающее реальное исключение в InputStreamCallback, перехватить его снаружи, а затем передать исходный файл потока в сбой. Если исключение не происходит, вы можете успешно передать исходный файл потока.

...