Как создать потокобезопасный блокирующий поток в Java? - PullRequest
1 голос
/ 11 мая 2019

Я хочу создать java.util.stream.Stream, который блокирует действия терминала и использует произвольный объект для синхронизации.Методы Stream должны делать это прозрачным способом, чтобы я мог безопасно передать Stream в код, который не знает о синхронизации.

Рассмотрим следующий пример:

void libraryMethod(Stream<Whatever> s) {
    for (int i = 0; i < 10000000; ++i) { /* ... */ }
    s.filter(Library::foo).forEach(Library::bar);
}

/* Elsewhere in my code */

Set<Whatever> aSet = Collections.synchronizedSet(...);
/* ... */
libraryMethod(new MyBlockingStream<>(set.stream(), set));

Передвыполняя forEach, я хочу, чтобы блокировка aSet была получена самой MyBlockingStream и снята только после завершения forEach.Это должно гарантировать, что я не получу ConcurrentModificationException s, потому что другие потоки могут захотеть изменить набор.Я не могу использовать synchronized (aSet) для всего libraryMethod, потому что это заблокировало бы aSet гораздо дольше, чем необходимо.

Возможно ли это сделать?Если да, существуют ли какие-либо реализации, которые делают это, или я должен написать это сам?

Примечание: этот вопрос не имеет ничего общего с тем, как Stream выполняет действия - мне все равно, параллельна ли онаили нет.Я знаю, что существуют по сути несинхронизируемые iterator() и spliterator() методы.Я тоже не забочусь о них.

Ответы [ 2 ]

0 голосов
/ 14 мая 2019

Вот что я в итоге сделал: https://gist.github.com/OLEGSHA/bda28ffaa4b24e64b94a8c30c3ad9b0c. Эти обертки Stream synchronize все терминальные операции, использующие предоставленный объект и обтекание потоков из промежуточных операций.

По сути это

public void forEach(Consumer<? super T> action) {
    synchronized (monitor) {
        parent.forEach(action);
    }
}

для каждой операции терминала в интерфейсе Stream.IntStream, LongStream и DoubleStream версии включены.

Я думаю, что должно быть лучшее решение.

0 голосов
/ 11 мая 2019

Вы можете использовать блокировку.

public class Example {
    public static ReentrantLock lock = new ReentrantLock();

    private static void sleep() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static Runnable createRunnable() {
        return () -> {
            try {
                Arrays.asList("a", "b", "c").stream().forEach(e -> {
                    if (!lock.isHeldByCurrentThread())
                        lock.lock();

                    sleep();
                    System.out.println(String.format("thread %s with element %s", Thread.currentThread().getId(), e));
                });
            } finally {
                if(lock.isHeldByCurrentThread())
                    lock.unlock();
            }
        };
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(createRunnable());
        Thread t2 = new Thread(createRunnable());
        Thread t3 = new Thread(createRunnable());

        t1.start();
        t2.start();
        t3.start();

        System.out.println("join all threads");
        t1.join();
        t2.join();
        t3.join();
    }
}

Поток, который первым достигнет forEach, заблокирует все остальные потоки.

В этом случае вывод

join all threads
thread 16 with element a
thread 16 with element b
thread 16 with element c
thread 15 with element a
thread 15 with element b
thread 15 with element c
thread 14 with element a
thread 14 with element b
thread 14 with element c

EDIT 1

Как указывал @Holger, это не будет работать, если поток будет вложенным.Внутренний поток освободит блокировку слишком рано.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...