Передача рабочего элемента между потоками (Java) - PullRequest
5 голосов
/ 06 февраля 2009

У меня есть две темы. Производитель создает фрагменты данных (объекты String), где потребитель обрабатывает эти строки. Суть в том, что моему приложению требуется только самый последний объект данных для обработки. Другими словами, если производителю удалось создать две строки "s1", а затем "s2", я хочу, чтобы потребитель обрабатывал только "s2". «s1» можно безопасно отбросить.

Конечно, нет проблем с реализацией класса, который реализует это поведение, но я хочу использовать стандартный механизм из java.util.concurrent (если такой механизм существует). Обратите внимание, что SynchronousQueue не является хорошим решением: потребитель будет блокировать при постановке в очередь "s1" и не получит возможность произвести "s2".

(Короче, я ищу одноэлементную коллекцию с блокирующей операцией удаления и неблокирующей операцией установки)

Есть идеи?

Ответы [ 4 ]

3 голосов
/ 06 февраля 2009

Я думаю, что ваш лучший ответ, вероятно, заключается в использовании ArrayBlockingQueue, где производитель (у вас есть только один производитель, верно?) Удаляет любой существующий элемент перед добавлением нового элемента.

Конечно, в этой реализации есть гоночные условия: потребитель может начать обработку элемента непосредственно перед тем, как производитель удалит его. Но эти условия будут существовать всегда, независимо от того, какую структуру данных вы используете.

3 голосов
/ 06 февраля 2009

А как насчет класса Exchanger ? Это стандартный способ обмена объектами между потоками. Специализируйте его с вашим классом, может быть список строк. Заставьте потребителя использовать только первый / последний.

0 голосов
/ 07 февраля 2009

Что ж, если вам нужна только последняя созданная строка, то вам вообще не нужна очередь - все, что вам нужно, - это ссылка на строку: производитель устанавливает ее, а потребитель читает ее. Если потребитель читает так долго, что производитель переустанавливает его ... ну и что?

Установка и чтение ссылок являются атомарными. Единственная проблема - если вы хотите, чтобы потребитель как-то был уведомлен о наличии доступной строки. Но даже тогда ... если потребитель делает что-то, что занимает какое-то время, тогда вам действительно не нужны какие-то модные штучки из библиотек параллелизма.

Обратите внимание, что этот пример работает с любым количеством потоков производителей и / или потребителей.

import java.util.Random;

public class Example {
    public static void main(String[] av) {
        new Example().go();
    }

    Object  mutex       = new Object();
    String  theString   = null;

    void go() {
        Runnable producer = new Runnable() {
            public void run() {
                Random rnd = new Random();
                try {
                    for (;;) {
                        Thread.sleep(rnd.nextInt(10000));
                        synchronized (mutex) {
                            theString = "" + System.currentTimeMillis();
                            System.out.println("Producer: Setting string to " + theString);
                            mutex.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        };

        Runnable consumer = new Runnable() {
            public void run() {
                try {
                    String mostRecentValue = null;
                    Random rnd = new Random();
                    for (;;) {
                        synchronized (mutex) {
                            // we use == because the producer
                            // creates new string
                            // instances
                            if (theString == mostRecentValue) {
                                System.out.println("Consumer: Waiting for new value");
                                mutex.wait();
                                System.out.println("Consumer: Producer woke me up!");
                            } else {
                                System.out.println("Consumer: There's a new value waiting for me");
                            }
                            mostRecentValue = theString;
                        }
                        System.out.println("Consumer: processing " + mostRecentValue);
                        Thread.sleep(rnd.nextInt(10000));
                    }
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        };


        new Thread(producer).start();
        new Thread(consumer).start();
    }
}
0 голосов
/ 06 февраля 2009

Для этого вы можете использовать массив размером один:

String[] oeq = new String[1];

Источник выборки:

public class Test {
    private static final String[] oeq = new String[1];
    public static void main(String[] args) {
        (new Producer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
    }

    private static class Producer extends Thread {
        public void run() {
            int i=0;
            while(true) {
                i++;
                synchronized(oeq) {
                    oeq[0] = ""+i;
                    oeq.notifyAll();
                }
            }
        }
    }

    private static class Consumer extends Thread {
        public void run() {
            String workload = null;
            while(true) {
                synchronized(oeq) {
                    try {
                        oeq.wait();
                    } catch(InterruptedException ie) {
                        ie.printStackTrace();
                    }
                    if(oeq[0] != null) {
                        workload = oeq[0];
                        oeq[0] = null;
                    }
                }
                if(workload != null) {
                    System.out.println(workload);
                }
            }
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...