Чтение из кэша Infinispan заходит в бесконечный цикл - PullRequest
0 голосов
/ 05 мая 2019

Я использую Nifi для создания конвейера потока данных, где я использую Infinispan a сервер кэширования. Но когда я использую исполняемый скрипт со скриптом Groovy, он идет по бесконечному циклу и открывает много соединений с сокетами. Я пытался закрыть то же самое, но все равно он открывает много соединений, а затем выдает

java.net.SocketException: No buffer space available (maximum connections reached?): connect

Перейдя по ссылке ниже, я изменил реестр https://support.pitneybowes.com/VFP06_KnowledgeWithSidebarTroubleshoot?id=kA280000000PEE1CAO&popup=false&lang=en_US

Затем проверил открытые соединения с помощью netstat -n I открывает 65534 из-за вышеуказанных настроек.

Ниже приведен скрипт для чтения из кэша Infinispan

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;

def cacheName = "mycache"

def configuration = new ConfigurationBuilder()
.addServer().host("localhost").port(11322).build();

def cacheManager = new RemoteCacheManager(configuration)

RemoteCache cacheA = cacheManager.getCache(cacheName)

flowFile = session.get()
if(!flowFile) return
key = flowFile.getAttribute('key')
id = flowFile.getAttribute('id')
jsonFromCache = cacheA.get(key + "_" + id);
if(cacheA != null) {
cacheA.stop()
}
if(cacheManager != null) {
cacheManager.stop()
}

flowFile = session.write(flowFile, {outputStream ->
  outputStream.write(jsonFromCache.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)

1 Ответ

2 голосов
/ 05 мая 2019

Вы открываете соединение с кешем до получения файла из сессии.

Итак, вы открываете соединение и в следующей строке просто выходите из скрипта, не закрывая его:

if(!flowFile) return

Еще один момент: Вы можете использовать процессор ExecuteGroovyScript. Затем можно запустить и остановить процессор. Пример вы можете найти здесь: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-groovyx-nar/1.9.2/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html

import org.apache.nifi.processor.ProcessContext
import java.util.concurrent.atomic.AtomicLong

class Const{
  static Date startTime = null;
  static AtomicLong triggerCount = null;
}

static onStart(ProcessContext context){
  Const.startTime = new Date()
  Const.triggerCount = new AtomicLong(0)
  println "onStart $context ${Const.startTime}"
}

static onStop(ProcessContext context){
  def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000
  println "onStop $context executed ${ Const.triggerCount } times during ${ alive } seconds"
}

def flowFile = session.get()
if(!flowFile)return
flowFile.'trigger.count' = Const.triggerCount.incrementAndGet()
REL_SUCCESS << flowFile
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...