Реализация многопользовательской очереди без блокировки - PullRequest
0 голосов
/ 01 мая 2019

Недавно я наткнулся на этот вопрос: скажем, 3 пользовательских потока, и необходимо реализовать очередь без блокировки (не может использовать синхронизация ), чтобы не потреблял потокзаблокирован.Предположим, что очередь уже содержит данные.

Я немного подумал об этом и натолкнулся на Атомные операции, которые при правильном использовании могут помочь.Моя реализация, как показано ниже.Поскольку данные уже находятся в очереди, я не реализовал метод enqueue и не заполнил массив внутри конструктора.

public class SPMCQueue {

    private AtomicInteger index = new AtomicInteger(0);

    public int[] arr;

    public SPMCQueue(int size) {
        arr = IntStream.range(0, size).toArray();
    }

    public Integer dequeue() {
        Integer ret = null;
        int x = index.getAndIncrement();
        if (x < arr.length) {
            ret = arr[x];
            System.out.println(arr[x] + " by " + Thread.currentThread().getName());
        }
        else {
            throw new RuntimeException("Queue is empty");
        }
        return ret;
    }
}

class QueueTest {
    public static void main(String[] args) {

        SPMCQueueq = new SPMCQueue(40);

        Runnable t1 = () -> {
            try {
            while (true) {
                q.dequeue();
            }
            }catch(Exception e) {

            }
        };

        Runnable t2 = () -> { 
            try {
            while(true) { q.dequeue(); }
            }catch(Exception e) {

            }
        };

        Runnable r3 = () -> { 

            try {
                while(true) { q.dequeue(); }
            } catch (Exception e) {
                // TODO Auto-generated catch block
                //e.printStackTrace();
            }

        };

        Thread thread1 = new Thread(t1);
        Thread thread2 = new Thread(t2);
        Thread thread3 = new Thread(r3);

        thread1.start();
        thread2.start();
        thread3.start();

    }
}

Я выполнил вышеуказанную программу, и результат показывает, что все 3 потребителя потребляютданные хотя и не в порядке, и некоторые потоки потребляют больше данных, чем другие потоки, но я не вижу, чтобы какие-либо данные появлялись несколько раз в o / p.

У меня есть следующие вопросы:

  1. Есть ли какие-либо проблемы в приведенной выше реализации?

  2. Каковы другие способыреализовать очередь потребителей без блокировки?

1 Ответ

0 голосов
/ 01 мая 2019

Я хочу дать свой ответ в тандеме с ответом: https://stackoverflow.com/a/21101904/7340499, поскольку этот вопрос похож на ваш.

Итак, ваши вопросы:

но я не вижу никаких данных, появляющихся несколько раз в o / p.

Это ожидается. Поскольку getAndIncrement () является атомарным, и каждый раз, когда к этой функции обращаются, вы получите другое значение x, следовательно, другой вывод. Однако , из-за объединения функций «getAndIncrement ()» и «System.out.print ()» в одной неатомарной операции удаления очереди, ваш вывод может иногда быть не в порядке, например, вы получаете x = 1 в одном потоке, а другой поток прерывает, получает x = 2 и печатает его, а затем ваш исходный поток завершает печать. Я полагаю, что это также указывает на проблемы вашей реализации, как указано в (1). Ваша заявка в порядке, очередь обрабатывается не по порядку?

Как еще можно реализовать очередь потребителей без блокировки?

Ну, атомные операции - это один из способов, как вы заметили. Но, по сути, они все равно очень похожи на замки. Они просто работают в меньшем масштабе (хотя есть некоторые отличия в реализации). Поэтому трудно представить их как разные методы, по крайней мере, для себя. Кроме этого, я считаю, что здесь есть несколько полезных ресурсов: Lock Free Queue - один производитель, несколько потребителей

...