Наконец-то я разработал решение.Сначала я объясню вопрос более подробно, потому что я действительно хотел знать, как реализовать объект или «кэш», который обновлялся очень часто или по какому-либо порядку, без необходимости перезапуска процесса потоковой передачи искры,то есть он будет обновляться в живом состоянии.
В моем случае этот «кэш» или обновленный объект является объектом (Singleton), который подключен к коллекции mongoDB для восстановления HashMap, который использовался каждым исполнителем и былКэшируется в памяти как хороший синглтон.Проблема с этим заключалась в том, что после выполнения потоковой отправки в кеше этот объект кэшировался в памяти, но он не обновлялся до тех пор, пока процесс не был перезапущен.Подумайте о трансляции как о режиме счетчика, который обновляется, когда переменная достигает 1000, но они доступны только для чтения и не могут быть изменены.Подумайте о счетчике, но он может быть прочитан только драйвером.
Наконец, мое решение заключается в том, что в блоке инициализации объекта, который загружает коллекцию Монго и кэш, я реализую это:
//Initialization Block
{
val ex = new ScheduledThreadPoolExecutor(1)
val task = new Runnable {
def run() = {
logger.info("Refresh - Inicialization")
initCache
}
}
val f = ex.scheduleAtFixedRate(task, 0, TIME_REFRES, TimeUnit.SECONDS)
}
initCache is nothing more than a function that connects mongo and loads a collection:
var cache = mutable.HashMap[String,Description]()
def initCache():mutable.HashMap[String, Description]={
val serverAddresses = Functions.getMongoServers(SERVER, PORT)
val mongoConnectionFactory = new MongoCollectionFactory(serverAddresses,DATABASE,COLLECTION,USERNAME,PASSWORD)
val collection = mongoConnectionFactory.getMongoCollection()
val docs = collection.find.iterator()
cache.clear()
while (docs.hasNext) {
var doc = docs.next
cache.put(...............
}
cache
}
Таким образом, после запуска отправки в потоковом режиме для каждой задачи каждая задача будет открывать еще одну задачу, которая будет заставлять каждый раз X (1 или 2 часа в моем случае) обновлять значение коллекции singleton, ивсегда восстанавливать это значение:
def getCache():mutable.HashMap[String, Description]={
cache
}