Нифи настроенный процессор не реагирует и не показывает никаких проблем - PullRequest
0 голосов
/ 08 апреля 2020

Я работаю с Nifi и пытаюсь создать процессор Apache Nifi, но он не реагирует! Процессор построен, он отображается в списке процессоров и работает, но без каких-либо входов! это ничего мне не показывает.
Вот мой код:

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        final AtomicReference<String> value = new AtomicReference<>();    
        Integer Somme=new Integer(1254+5520);

         FlowFile flowFile = session.get();
         session.read(flowFile, new InputStreamCallback() {
             @Override
             public void process(InputStream in) throws IOException {
                 HttpGet request = new HttpGet(URL);

                 HttpHost target = new HttpHost(localhost, 8080, "http");
                 CredentialsProvider provider = new BasicCredentialsProvider();
                 provider.setCredentials(
                         new AuthScope(target.getHostName(), target.getPort()),
                         new UsernamePasswordCredentials(Username, paswd)
                 );

                 AuthCache authCache = new BasicAuthCache();
                 authCache.put(target, new BasicScheme());

                 HttpClientContext localContext = HttpClientContext.create();
                 localContext.setAuthCache(authCache);

                 try (CloseableHttpClient httpClient = HttpClientBuilder.create()
                         .setDefaultCredentialsProvider(provider)
                         .build();
                      CloseableHttpResponse response = httpClient.execute(target, request, localContext)) {

                     // 401 if wrong user/password
                     System.out.println(response.getStatusLine().getStatusCode());

                     HttpEntity entity = response.getEntity();
                     if (entity != null) {
                         // return it as a String
                         String result = EntityUtils.toString(entity);
                         System.out.println(result.getBytes());
                         value.set(result);
                     }

                 }

             }
         });

      // Write the results to an attribute
         String results = value.get();
         if(results != null && !results.isEmpty()){
             flowFile = session.putAttribute(flowFile, "match", results);
         }

         // To write the results back out ot flow file
         flowFile = session.write(flowFile, new OutputStreamCallback() {

             @Override
             public void process(OutputStream out) throws IOException {
                 out.write(value.get().getBytes());
             }
         });

         session.transfer(flowFile, Success);

    }

}

Любая помощь, пожалуйста! Прошло уже несколько дней, и, кажется, ничего не подходит :(

1 Ответ

1 голос
/ 08 апреля 2020

При реализации исходного процессора (то есть ввода нет), затем вы хотите использовать session.create () для создания нового файла потока. Использование session.get () попытается получить файл потока из входящих очередей, но, так как их нет, он всегда будет нулевым.

...