Я новичок в Нифи. Я работаю над NiFiProject, который читает содержимое файла и делает некоторые ETL. Результаты нужно поместить в другой файл.
Я получаю отношения не удовлетворены ошибка:
MyspanishprocessorIid-b673bb80-0169-1 ooo-2f8a-c22081380d29
Myspanishprocessodidzb673bb80-0169-1000-2f8a-c22081380d29 не удалось
сеанс обработки из-за StandardFlowFileRecordluuidze8ee1374-ef25-43d5-b35e-
ac76dba0955c, claimzStandardContentClaim
(resourceClaimzStandardResourceClaim (idz1554235475648-1, containerzdefault,
сечение - Il, смещение; O,
трансферные отношения не указаны; Процессор с административной отдачей за 1 сек:
org.apache.nifi.processor.exception.FlowFileHandlingExceptlon:
StandardFlowFileRecordluuidze8ee1374-ef25-43d5-b35e-
ac76dba0955c, claimzStandardContentClaim
(resourceClaimzStandardResourceClaim (idz1554235475648-1, контейнер по умолчанию,
смещение сечения; O,
Передаточные отношения не указаны
Код, который я написал:
@Tags({"spanish"})
@CapabilityDescription("Spanish processor")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MySpanishProcessor extends AbstractProcessor {
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder().name("MY_PROPERTY")
.displayName("My property")
.description("Example Property")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_MATCH = new Relationship.Builder()
.name("matched")
.description("FlowFiles are routed to this relationship when the Regular Expression is successfully evaluated and the FlowFile is modified as a result")
.build();
public static final Relationship REL_NO_MATCH = new Relationship.Builder()
.name("unmatched")
.description("FlowFiles are routed to this relationship when no provided Regular Expression matches the content of the FlowFile")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(MY_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_MATCH);
relationships.add(REL_NO_MATCH);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
Table officeTable = null;
Table legalEntitytable = null;
Table citiesTable = null;
Table joinOfOfficeLegalCityTable = null;
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
getLogger().debug("In the Trigger");
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
//Lets read the file using the call back
ArrayList<String> lineList= new ArrayList<>();
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream inputStream) throws IOException {
BufferedReader bufferedReader= new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line=bufferedReader.readLine())!=null)
{lineList.add(line);}
}
});
FlowFile flowFile1=session.create();
session.write(flowFile1, new OutputStreamCallback() {
@Override
public void process(OutputStream outputStream) throws IOException {
outputStream.write("No Data".getBytes());
}
});
// session.getProvenanceReporter().modifyAttributes(flowFile1);
session.transfer(flowFile1, REL_MATCH);//needs to be called to transfer
}
}