Обновление кэшированных значений в потоковой передаче без перезагрузки пакета - PullRequest
0 голосов
/ 17 декабря 2018

Может быть, вопрос слишком прост, по крайней мере, так, но у меня есть следующая проблема:

A.Выполнить подачу искры в процессе искрового потока.

 ccc.foreachRDD(rdd => {
          rdd.repartition(20).foreachPartition(p => {

               val repo = getReposX
               t foreach (g => {
    .................

B.getReposX - это функция, которая выполняет запрос в mongoDB, восстанавливая карту с ключом / значением, необходимым каждому исполнителю процесса.

C.В каждом g в foreach я управляю этой картой "кэшированной"

Проблема или вопрос в том, что когда в коллекции Монго что-то меняется, я не наблюдаю или не обнаруживаю изменение, поэтому яУправление картой не обновлено.Мой вопрос: как я могу получить это?Да, я знаю, если я перезагружаю иск-отправку и драйвер запускается снова, все в порядке, но в противном случае я никогда не увижу обновление на моей карте.

Есть идеи или предложения?С уважением.

1 Ответ

0 голосов
/ 17 декабря 2018

Наконец-то я разработал решение.Сначала я объясню вопрос более подробно, потому что я действительно хотел знать, как реализовать объект или «кэш», который обновлялся очень часто или по какому-либо порядку, без необходимости перезапуска процесса потоковой передачи искры,то есть он будет обновляться в живом состоянии.

В моем случае этот «кэш» или обновленный объект является объектом (Singleton), который подключен к коллекции mongoDB для восстановления HashMap, который использовался каждым исполнителем и былКэшируется в памяти как хороший синглтон.Проблема с этим заключалась в том, что после выполнения потоковой отправки в кеше этот объект кэшировался в памяти, но он не обновлялся до тех пор, пока процесс не был перезапущен.Подумайте о трансляции как о режиме счетчика, который обновляется, когда переменная достигает 1000, но они доступны только для чтения и не могут быть изменены.Подумайте о счетчике, но он может быть прочитан только драйвером.

Наконец, мое решение заключается в том, что в блоке инициализации объекта, который загружает коллекцию Монго и кэш, я реализую это:

    //Initialization Block
      {
          val ex = new ScheduledThreadPoolExecutor(1)
          val task = new Runnable {
            def run() = {
              logger.info("Refresh - Inicialization")
              initCache
            }
          }
          val f = ex.scheduleAtFixedRate(task, 0, TIME_REFRES, TimeUnit.SECONDS)
       }


initCache is nothing more than a function that connects mongo and loads a collection:

    var cache = mutable.HashMap[String,Description]()
    def initCache():mutable.HashMap[String, Description]={
           val serverAddresses = Functions.getMongoServers(SERVER, PORT)

        val mongoConnectionFactory = new MongoCollectionFactory(serverAddresses,DATABASE,COLLECTION,USERNAME,PASSWORD)

        val collection = mongoConnectionFactory.getMongoCollection()

        val docs = collection.find.iterator()
        cache.clear()
        while (docs.hasNext) {

          var doc = docs.next

          cache.put(...............
        }

        cache
      }

Таким образом, после запуска отправки в потоковом режиме для каждой задачи каждая задача будет открывать еще одну задачу, которая будет заставлять каждый раз X (1 или 2 часа в моем случае) обновлять значение коллекции singleton, ивсегда восстанавливать это значение:

def getCache():mutable.HashMap[String, Description]={
      cache
}
...