Запись в файл под NFS в приложении Grails с использованием Quartz и RabbitMQ - PullRequest
1 голос
/ 02 марта 2011

У нас есть приложение Grails, которое использует RabbitMQ для передачи сообщений. Архитектура для производственной среды:
- 2 веб-сервера, скажем, web1, web2. Оба они запускают экземпляр приложения Grails
- сервер RabbitMQ установлен только на одном из серверов, web1
- В конфигурации RabbitMQ в приложении Grails у нас есть 10 потребителей (на экземпляр приложения)

У нас есть задание Quartz, которое создает некоторые сообщения и передает их в очередь, а у нас есть служба поддержки пользователей, которая обрабатывает поступающие в очередь сообщения.
1. Как мы можем определить, какой сервер (экземпляр приложения) должен выполнить задание Quartz? Я предполагаю, что есть только один сервер, на котором работает Job, и Quartz позаботится об этом. 2. Как мы можем определить, какой сервер обрабатывает сообщения из очереди?

Проблема в том, что методы, которые используют сообщения очереди, в конечном итоге записывают несколько строк в файл .csv, который находится в NFS.

Изначально у нас было проблем , с фактической записью в файл .csv. В файле было несколько «сломанных» наполовину записанных строк, но мы решили эту проблему, добавив @Synchronized к методу, который записал в файл .csv. Теперь проблема в том, что некоторые строки просто не пишутся вообще.

Есть идеи? Я не уверен, является ли это проблемой программирования, и если да, то как ее можно решить, или это архитектурная проблема.

UPD : @ Олександр
Изначально у меня был (пример) def getStringsToWrite(File file, List someOtherList) { def stringsList = [] someOtherList.each { def someString = "someString" stringsList << someString } writeRowsToFile(file, stringsList) }</p> <p>@Synchronized def writeRowsToFile(File file, List stringsList) { file.withWriterAppend {out-> stringsList.each {row-> out.writeLine row } } }

И он не работал "должным образом"

Теперь я изменил код на что-то вроде:

class someServiceClass { //singleton
  <strong>LinkedBlockingQueue csvWritingQueue = new LinkedBlockingQueue()</strong>  </p>

<p>def getStringsToWrite(File file, List someOtherList) {
    def stringsList = []
    someOtherList.each {
      def someString = "someString"
      <strong>csvWritingQueue.put(someString)</strong>
    }
    writeRowsToFile(<strong>file</strong>)
  }</p>

<p>@Synchronized
  def writeRowsToFile(File file) {
    file.withWriterAppend {out->
      <strong>while (!csvWritingQueue.isEmpty()) {</strong>
        <strong>out.writeLine csvWritingQueue.poll()</strong> 
      }
    }
  }
}

Пожалуйста, дайте мне знать, если вам нужна дополнительная информация. Я пытался держать его на высоком уровне, но я буду более чем рад включить в пост какой-то код.

Заранее спасибо,
Ираклис

Ответы [ 2 ]

0 голосов
/ 08 марта 2011

Вы спрашиваете о том, как настроить кварц в кластере? http://www.quartz -scheduler.org / документы / конфигурация / ConfigJDBCJobStoreClustering.html

0 голосов
/ 02 марта 2011

Я предполагаю, что это может произойти, потому что некоторые потоки (обработчики) приостанавливаются из-за тайм-аута в ожидании синхронизированной блокировки.

Но это только мои догадки, и это зависит от архитектуры приложения (как вы организовали очереди, обработчики и т. Д.)

UPD: У меня есть какая-то идея. Запись в файл CSV не является поточно-ориентированной, поэтому вы используете синхронизацию. И, возможно, именно поэтому некоторые темы приостановлены во время ожидания. Но мы могли бы легко решить эту проблему, создав класс WriteManager . Этот класс должен иметь экземпляр BlockingQueue , чтобы обработчики не вызывали метод write напрямую, но помещали данные для записи в Очередь (если очередь заполнена, они имеют ждать, но если очередь будет достаточно большой, они вообще не будут ждать), а класс менеджера будет опрашивать очередь в бесконечный цикл (конечно, он будет конечным и будет контролироваться флагом ) и записать данные в файл CSV. Поэтому, когда вам нужно начать писать \ управлять процессом, вы просто запускаете WriteManager , а когда запись больше не требуется, вы просто отключаете цикл опроса (на самом деле цикл не должен останавливаться, когда вы останавливаете его вручную, но он должен опросить все объекты из очереди и только потом остановиться).

UPD2: Вы вызываете getStringsToWrite несколько раз? Это очень простой пример того, что я имею в виду, но вы можете легко расширить его или даже полностью переписать.

class someServiceClass { //singleton
  LinkedBlockingQueue csvWritingQueue = new LinkedBlockingQueue()
  def currentWriter = null
  def writerThread = null

def getStringsToWrite(List someOtherList) {
    def stringsList = []
    someOtherList.each {
      def someString = "someString"
      csvWritingQueue.put(someString)
    }
  }


@Synchronized
  def writeRowsToFile(File file) {
    file.withWriterAppend {out->
      while (!csvWritingQueue.isEmpty()) {
        out.writeLine csvWritingQueue.poll() 
      }
    }
  }

 def write (File file) {
   if (writerThread == null) {
   currentWriter = new Runnable() {
      boolean isRun = true
      public void run() {
         while(isRun) {
            if(!csvWritingQueue.isEmpty()) {
               writeRowsToFile(file)
            }
           try {
            Thread.sleep(5 * 1000);
           }catch(e){
            //e.printStackTrace()
           }
         }
      }
   }
   writerThread = new Thread(currentWriter)
  writerThread.start()
  }
 }

def stop() {
  if (currentWriter != null) {
    currentWriter.isRun = false;
   try {
    writerThread.join();
   }catch(e){
    //e.printStackTrace()
   }
  }
 currentWriter = null;
 writerThread = null;
}

   }
}

Надеюсь, это поможет вам.

Плагин BackgroundThread - имеет хороший пример фонового работника.

...