Является ли JMS ответом на необходимость постоянной очереди блокировки? - PullRequest
1 голос
/ 15 ноября 2010

Я создаю библиотеку, которая состоит из приложения Log4J, которое асинхронно отправляет события на удаленный сервер. Когда выполняется запись в журнал, appender асинхронно записывает событие в локальную очередь, которую пул потребителей затем извлекает и отправляет на удаленный сервер.

Полностью находящимся в памяти решением было бы создание BlockingQueue, которое решило бы проблему параллелизма. Однако я бы хотел, чтобы очередь сохранялась, чтобы, если удаленный сервер недоступен, я не увеличивал неограниченную очередь или не начинал отбрасывать сообщения в случае ограниченной очереди.

Я думал об использовании встроенной базы данных H2 для локального хранения событий, а затем использовал механизм опроса для извлечения событий и отправки на удаленный компьютер. Я бы предпочел использовать BlockingQueue, чем опрашивать таблицу базы данных.

Является ли JMS ответом?

EDIT:

Если ответом является JMS, и похоже, что так и будет, есть ли у кого-нибудь рекомендации относительно легкого встраиваемого решения JMS, которое можно настроить для приема только сообщений в процессе? Другими словами, я не хочу и, возможно, не буду иметь права открывать сокет TCP для прослушивания.

EDIT:

У меня сейчас есть ActiveMQ, и, похоже, он работает. Спасибо всем.

Ответы [ 4 ]

2 голосов
/ 16 ноября 2010

Боб Ли с открытым исходным кодом создал очень простую очередь на диске с резервированием, https://github.com/square/retrofit/blob/master/modules/android/src/retrofit/io/QueueFile.java - может быть полезным, и, конечно, его гораздо проще внедрить, чем JMS, если вы можете принять локальную стойкость.

Этот класс является автономным - его можно скопировать и вставить.

1 голос
/ 15 ноября 2010

Вы можете определенно использовать JMS для этой цели. Насколько я понимаю, вы используете приложение JMS Log4J. Этот компонент отправляет сообщения в предварительно сконфигурированное место назначения JMS (обычно очередь). Вы можете настроить эту очередь на сохранение. В этом случае все сообщения, вставленные в очередь, будут автоматически сохранены в каком-либо постоянном хранилище (обычно в базе данных). К сожалению, эта конфигурация зависит от поставщика (зависит от поставщика JMS), но обычно очень проста. Пожалуйста, обратитесь к документации вашего провайдера JMS.

1 голос
/ 15 ноября 2010

Вы можете использовать JMS для асинхронной отправки сообщений на удаленный компьютер (конечно, при условии, что он может их получать), Log4j имеет приложение JMS Appender, которое вы можете использовать для этого.

0 голосов
/ 19 января 2016

Проверьте, работает ли это

Этот код должен работать для вас - его очередь для постоянной блокировки в памяти - требует некоторой настройки файла, но он должен работать

       package test;

     import java.io.BufferedReader;
     import java.io.BufferedWriter;
     import java.io.File;
     import java.io.FileReader;
     import java.io.FileWriter;
     import java.io.IOException;
     import java.util.ArrayList;
     import java.util.Collections;
     import java.util.LinkedList;
     import java.util.List;

     public class BlockingQueue {

    //private static Long maxInMenorySize = 1L;
    private static Long minFlushSize = 3L;

    private static String baseDirectory = "/test/code/cache/";
    private static String fileNameFormat = "Table-";

    private static String  currentWriteFile = "";

    private static List<Object>  currentQueue = new LinkedList<Object>();
    private static List<Object>  lastQueue = new LinkedList<Object>();

    static{
        try {
            load();
        } catch (IOException e) {
            System.out.println("Unable To Load");
            e.printStackTrace();
        }
    }

    private static void load() throws IOException{
        File baseLocation = new File(baseDirectory);
        List<String> fileList = new ArrayList<String>();

        for(File entry : baseLocation.listFiles()){
            if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
                fileList.add(entry.getAbsolutePath());
            }
        }

        Collections.sort(fileList);

        if(fileList.size()==0){
            //currentQueue = lastQueue = new ArrayList<Object>();
            currentWriteFile = baseDirectory + "Table-1";
            BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
            while (!lastQueue.isEmpty()){
                writer.write(lastQueue.get(0).toString()+ "\n");
                lastQueue.remove(0);
            }
            writer.close();
        }else{
            if(fileList.size()>0){
                    BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
                    String line=null;
                    while ((line=reader.readLine())!=null){
                        currentQueue.add(line);
                    }
                    reader.close();
                    File toDelete = new File(fileList.get(0));
                    toDelete.delete();
            }

            if(fileList.size()>0){
                BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1)));
                currentWriteFile = fileList.get(fileList.size()-1);
                String line=null;
                while ((line=reader.readLine())!=null){
                    lastQueue.add(line);
                }
                reader.close();
                //lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9));
            }
        }

    }

    private void loadFirst() throws IOException{
        File baseLocation = new File(baseDirectory);
        List<String> fileList = new ArrayList<String>();

        for(File entry : baseLocation.listFiles()){
            if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
                fileList.add(entry.getAbsolutePath());
            }
        }

        Collections.sort(fileList);

        if(fileList.size()>0){
                BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
                String line=null;
                while ((line=reader.readLine())!=null){
                    currentQueue.add(line);
                }
                reader.close();
                File toDelete = new File(fileList.get(0));
                toDelete.delete();
        }
    }

    public Object pop(){
        if(currentQueue.size()>0)
            return  currentQueue.remove(0);

        if(currentQueue.size()==0){
            try {
                loadFirst();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        if(currentQueue.size()>0)
            return  currentQueue.remove(0);
        else
            return null;
    }

    public synchronized Object waitTillPop() throws InterruptedException{
        if(currentQueue.size()==0){
            try {
                loadFirst();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            if(currentQueue.size()==0)
                wait();
        }
        return currentQueue.remove(0);
    }

    public synchronized void push(Object data) throws IOException{
        lastQueue.add(data);
        this.notifyAll();
        if(lastQueue.size()>=minFlushSize){
            BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
            while (!lastQueue.isEmpty()){
                writer.write(lastQueue.get(0).toString() + "\n");
                lastQueue.remove(0);
            }
            writer.close();

            currentWriteFile  = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) + 
                    (Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1);
        }
    }

    public static void main(String[] args) {
        try {
            BlockingQueue bq = new BlockingQueue();

            for(int i =0 ; i<=8 ; i++){
                bq.push(""+i);
            }

            System.out.println(bq.pop());
            System.out.println(bq.pop());
            System.out.println(bq.pop());

            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());



        } catch (Exception e) {
            e.printStackTrace();
        }
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...