ExecutorService поток безопасности - PullRequest
1 голос
/ 08 января 2020

Допустим, у меня есть экземпляр ExecutorService от одного из фабричных методов Executors c.

Если я отправляю Callable, где RetVal не является потокобезопасным, локально созданным объектом из некоторого потока, выполните Мне нужно беспокоиться о целостности RetVals, когда я получаю () его из того же потока? Люди говорят, что локальные переменные являются поточно-ориентированными, но я не уверен, применимо ли это, когда вы возвращаете объект, созданный локально, и получаете его из другого потока.

Вот пример, аналогичный моей ситуации:

ExecutorService executor = Executors.newFixedThreadPool(5);
Future<List<String>> fut = executor.submit(() -> {
    List<String> ret = new ArrayList<>();
    ret.add("aasdf");
    ret.add("dfls");
    return ret;
});

List<String> myList = fut.get();

В приведенном выше примере я извлекаю ArrayList, который был создан в другом потоке - созданном executor. Я не думаю, что приведенный выше код является потокобезопасным, но я не смог найти много информации относительно моей конкретной ситуации c.

Теперь я попробовал вышеуказанный код на своем компьютере, и он фактически вернул ожидаемый результат 100 % времени я пробовал это, и я даже пытался с моей собственной реализацией ExecutorService, и до сих пор я получил только ожидаемые результаты. Так что, если мне не повезло, я почти уверен, что это работает, но я не уверен, как. Я создал не потокобезопасный объект в другом потоке и получил его в другом; разве у меня не должно быть возможности получить частично сконструированный объект - в моем случае список, который не содержит 2 строки?

Ниже приведена моя пользовательская реализация, которую я сделал просто для тестирования. Вы можете игнорировать перечисление EType.

class MyExecutor {

    enum EType {
        NoHolder, Holder1, Holder2
    }

    private ConcurrentLinkedQueue<MyFutureTask<?>> tasksQ;
    private final Thread thread;

    private final EType eType;

    public MyExecutor(EType eType) {
        eType = Objects.requireNonNull(eType);

        tasksQ = new ConcurrentLinkedQueue<>();
        thread = new Thread(new MyRunnable());
        thread.start();
    }

    public <T> Future<T> submit(Callable<T> c) {
        MyFutureTask<T> task = new MyFutureTask<T>(c, eType);
        tasksQ.add(task);
        return task;
    }

    class MyRunnable implements Runnable {
        @Override
        public void run() {
            while (true) {
                if (tasksQ.isEmpty()) {
                    try {
                        Thread.sleep(1);
                        continue;
                    } catch (InterruptedException ite) {
                        Thread.interrupted();
                        break;
                    }
                }

                MyFutureTask<?> task = tasksQ.poll();
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class MyFutureTask<T> implements RunnableFuture<T> {

        final Callable<?> cb;
        volatile Object outcome;

        static final int STATE_PENDING = 1;
        static final int STATE_EXECUTING = 2;
        static final int STATE_DONE = 3;

        final AtomicInteger atomicState = new AtomicInteger(STATE_PENDING);

        final EType eType;

        public MyFutureTask(Callable<?> cb, EType eType) {
            cb = Objects.requireNonNull(cb);
            eType = Objects.requireNonNull(eType);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new NotImplementedException();
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return atomicState.get() == STATE_DONE;
        }

        @SuppressWarnings("unchecked")
        @Override
        public T get() throws InterruptedException, ExecutionException {
            while (true) {
                switch (atomicState.get()) {
                case STATE_PENDING:
                case STATE_EXECUTING:
//                      Thread.sleep(1);
                    break;
                case STATE_DONE:
                    return (T)outcome;
                default:
                    throw new IllegalStateException();
                }
            }
        }

        @Override
        public T get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            throw new NotImplementedException();
        }

        void set(T t) {
            outcome = t;
        }

        @Override
        public void run() {
            if (atomicState.compareAndSet(STATE_PENDING, STATE_EXECUTING)) {
                Object result;
                try {
                    switch (eType) {
                    case NoHolder:
                        result = cb.call();
                        break;
                    case Holder1:
                        throw new NotImplementedException();
                    case Holder2:
                        throw new NotImplementedException();
                    default:
                        throw new IllegalStateException();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    result = null;
                }

                outcome = result;
                atomicState.set(STATE_DONE);
            }
        }
    }
}

class MyTask implements Callable<List<Integer>> {
    @Override
    public List<Integer> call() throws Exception {
        List<Integer> ret = new ArrayList<>(100);
        IntStream.range(0, 100).boxed().forEach(ret::add);
        return ret;
    }
}

Ответы [ 2 ]

4 голосов
/ 08 января 2020

Важно то, что происходит до отношений. Из ExecutorService API-документов:

Эффекты согласованности памяти: действия в потоке перед отправкой задачи Runnable или Callable в ExecutorService случай-до любые действия, предпринятые этой задачей, которые, в свою очередь, происходят до , результат получается через Future.get().

Таким образом, вы можете безопасно передавать изменяемый объект, подобный этому , Реализация ExecutorService передает объект через некоторую форму безопасной публикации .

Очевидно, не обновляйте объект в исходном потоке после его возвращения.

Если вы должны были общаться между потоками, сохраняя в общем не volatile поле, тогда это было бы небезопасно.

2 голосов
/ 08 января 2020

Безопасность потоков становится проблемой, когда несколько потоков пытаются одновременно получить доступ и изменить одно и то же состояние.

Обратите внимание, что вы не получите фактический результат от Future до тех пор, пока задача не будет завершена (т.е. Future#get не вернется, пока задача не будет завершена).

В вашем первом примере безопасность потока не является проблемой, поскольку новый объект (в то время как изменяемый) создается одним потоком (поток, созданный Исполнитель) и извлекается из объекта Future, как только этот поток завершил обработку задачи. Как только вызывающий поток получает объект, он не может быть изменен каким-либо другим потоком, потому что создающий поток больше не имеет доступа к списку.

...