Я пытаюсь отправить исходный файл потока в качестве ввода на следующий процессор, но в итоге получаю сообщение об ошибке, я очень плохо знаком с Nifi, а также имею некоторый опыт в Java.
public void onTrigger(ProessorContext context, ProcessSessionsession) throws ProceException{
FlowFile flowfile = session.get();
if(flowfile == null){
return;
}
ArrayList<String> headData = new ArrayList<String>();
try{
session.read(flowfile, new InputStreamCallback(){
final DBCPService= context.getProperty(CONNECTION_POOL).asContollerService(DBCPService.class);
String query = " CREATE TABLE MODEL (";
@SuppressWarnings("deprecation")
public void process(InputStream inputStream) throws IOException {
try{
OPCPackage pkg = OPCPackage.open(inputStream);
XSSFWorkbook workbook = new XSSFWorkbook(pkg);
workbook.getAllNames();
String dateheader = "date"
XSSFSheet sheetName = workbook.getSheet(0);
Row row = sheetName.getRow(0);
for(Cell cell: row) {
switch (cell.getCellType()){
case NUMERIC:
if(HSSDataUtil.isCellDateFromated(cell)){
DataFormatter dataFromatter = new DataFormatter();
headData.add(dataFromatter.formatCellValue(cell);
query +=dataFromatter.formatCellValue(cell)+" " + "INT" ;
}else{
headData.add(String.valueOf(cell.getNumericCellValue()));
}
break;
case STRING:
headData.add(cell.getStringCellValue());
if(cell.getStringCellValue().toLowerCase().contains(dateheader))
query += cell.getStringCellValue() + " " + "TIMESTAMP,";
else
query +=cell.getStringCellValue() + " + "VARCHAR(50),";
break;
case BOOLEAN:
headData.add(String.valueOf(cell.getBooleanCellValue());
break;
default:
headData.add("");
break;
}
}
query = query.substring(0, query.length() -1);
query += ")";
workbook.close();
final Connection con = DBCPService.getConnection();
try{
java.sql.PreparedStatement = con.prepareStatement(query);
PreparedStatement.execute();
con.commit();
session.transfer(flowfile, REL_SUCCESS);
}catch (SQL Exception e){
e.printStackTrace();
session.transfer(flowfile, REL_FAILURE);
}
}catch(InvalidFromatException ife){
getLogger().error(" only .xlsx excel files are supprted", ife);
thrownew UnsupportedOperationException("Only .xlsx OOXML files are substring", ife);
}
}
});
{catch (RuntimeException ex) {
getLogger().error("Failed to process incoming Excel document. " + ex.getMessage(), ex);
FlowFile failedFlowFile = session.putAttribute(flowfile, testxlsqlProcessor.class.getMessage());
}
final StringBuilder stringBuilder = new StringBuilder();
flowfile = session.write(flowfile, new StreamCallback(){
public void process(InputStream in, OutputStream out) throws IOException{
stringBuilder.append(IOUtils.copy(in,out));
}
});
}
}
Если я не добавляйте выходной поток, я получаю исключительное отношение переноса.