NiFi Чтение и изменение содержимого файла - PullRequest
1 голос
/ 09 апреля 2019

Я новичок в Нифи. Я работаю над NiFiProject, который читает содержимое файла и делает некоторые ETL. Результаты нужно поместить в другой файл.

Я получаю отношения не удовлетворены ошибка:

MyspanishprocessorIid-b673bb80-0169-1 ooo-2f8a-c22081380d29 Myspanishprocessodidzb673bb80-0169-1000-2f8a-c22081380d29 не удалось сеанс обработки из-за StandardFlowFileRecordluuidze8ee1374-ef25-43d5-b35e- ac76dba0955c, claimzStandardContentClaim (resourceClaimzStandardResourceClaim (idz1554235475648-1, containerzdefault, сечение - Il, смещение; O, трансферные отношения не указаны; Процессор с административной отдачей за 1 сек: org.apache.nifi.processor.exception.FlowFileHandlingExceptlon: StandardFlowFileRecordluuidze8ee1374-ef25-43d5-b35e- ac76dba0955c, claimzStandardContentClaim (resourceClaimzStandardResourceClaim (idz1554235475648-1, контейнер по умолчанию, смещение сечения; O, Передаточные отношения не указаны

Код, который я написал:

 @Tags({"spanish"})
@CapabilityDescription("Spanish processor")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MySpanishProcessor extends AbstractProcessor {
    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder().name("MY_PROPERTY")
            .displayName("My property")
            .description("Example Property")
            .required(false)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship REL_MATCH = new Relationship.Builder()
            .name("matched")
            .description("FlowFiles are routed to this relationship when the Regular Expression is successfully evaluated and the FlowFile is modified as a result")
            .build();
    public static final Relationship REL_NO_MATCH = new Relationship.Builder()
            .name("unmatched")
            .description("FlowFiles are routed to this relationship when no provided Regular Expression matches the content of the FlowFile")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(MY_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_MATCH);
        relationships.add(REL_NO_MATCH);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }
    Table officeTable = null;
    Table legalEntitytable = null;
    Table citiesTable = null;
    Table joinOfOfficeLegalCityTable = null;
    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        getLogger().debug("In the Trigger");
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
         //Lets read the file using the call back
        ArrayList<String> lineList= new ArrayList<>();
        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(InputStream inputStream) throws IOException {
                BufferedReader bufferedReader= new BufferedReader(new InputStreamReader(inputStream));
                String line;
                while ((line=bufferedReader.readLine())!=null)
                {lineList.add(line);}
            }
        });

        FlowFile flowFile1=session.create();
        session.write(flowFile1, new OutputStreamCallback() {
            @Override
            public void process(OutputStream outputStream) throws IOException {
                outputStream.write("No Data".getBytes());
            }
        });
//        session.getProvenanceReporter().modifyAttributes(flowFile1);
        session.transfer(flowFile1, REL_MATCH);//needs to be called to transfer
    }
}

Ответы [ 2 ]

0 голосов
/ 15 апреля 2019

После некоторых проб и ошибок сработал следующий код.

 @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    getLogger().debug("In the Trigger");
    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }
    //Lets read the file using the call back
    ArrayList<String> lineList = new ArrayList<>();
  final SpanishCodeFilePreprocessor spanishCodeFilePreprocessor = new SpanishCodeFilePreprocessor();
    try {
        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(InputStream inputStream) throws IOException {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    spanishCodeFilePreprocessor.identifyRecordTypeAndProcessIt(line);
                }
            }
        });
    }
    catch (Exception e)
    {
        getLogger().error(e.toString());
    }
    try {
        session.write(flowFile, new OutputStreamCallback() {
            @Override
            public void process(OutputStream outputStream) throws IOException {
                officeTable=spanishCodeFilePreprocessor.getOfficeTable();

                String s = "Office Table size: " + String.valueOf(officeTable.shape());
                officeTable.write().csv(outputStream);
            }
        });
        session.getProvenanceReporter().modifyAttributes(flowFile);
        session.transfer(flowFile, REL_MATCH);//needs to be called to transfer
    } catch (Exception e) {
        getLogger().error("Exception in spanishProcessor");
        session.write(flowFile, new OutputStreamCallback() {
            @Override
            public void process(OutputStream outputStream) throws IOException {
                String s = "Office Table size: 0";
                outputStream.write(s.getBytes());
            }
        });
        session.getProvenanceReporter().modifyAttributes(flowFile);
        session.transfer(flowFile, REL_NO_MATCH);//needs to be called to transfer
    }


}
0 голосов
/ 09 апреля 2019

Каждый файл потока должен быть учтен, что означает, что любой файл потока, созданный из session.create или полученный из session.get, должен быть передан или удален.

Результат любого session.write или session.putAttribute вернет новую ссылку на файл потока, которую необходимо отслеживать. Итак ...

FlowFile flowFile1=session.create();
flowFile1 = session.write(flowFile1, new OutputStreamCallback() {

Затем необходимо передать flowFile1.

...