Допустим, у меня есть экземпляр ExecutorService от одного из фабричных методов Executors c.
Если я отправляю Callable, где RetVal не является потокобезопасным, локально созданным объектом из некоторого потока, выполните Мне нужно беспокоиться о целостности RetVals, когда я получаю () его из того же потока? Люди говорят, что локальные переменные являются поточно-ориентированными, но я не уверен, применимо ли это, когда вы возвращаете объект, созданный локально, и получаете его из другого потока.
Вот пример, аналогичный моей ситуации:
ExecutorService executor = Executors.newFixedThreadPool(5);
Future<List<String>> fut = executor.submit(() -> {
List<String> ret = new ArrayList<>();
ret.add("aasdf");
ret.add("dfls");
return ret;
});
List<String> myList = fut.get();
В приведенном выше примере я извлекаю ArrayList, который был создан в другом потоке - созданном executor. Я не думаю, что приведенный выше код является потокобезопасным, но я не смог найти много информации относительно моей конкретной ситуации c.
Теперь я попробовал вышеуказанный код на своем компьютере, и он фактически вернул ожидаемый результат 100 % времени я пробовал это, и я даже пытался с моей собственной реализацией ExecutorService, и до сих пор я получил только ожидаемые результаты. Так что, если мне не повезло, я почти уверен, что это работает, но я не уверен, как. Я создал не потокобезопасный объект в другом потоке и получил его в другом; разве у меня не должно быть возможности получить частично сконструированный объект - в моем случае список, который не содержит 2 строки?
Ниже приведена моя пользовательская реализация, которую я сделал просто для тестирования. Вы можете игнорировать перечисление EType.
class MyExecutor {
enum EType {
NoHolder, Holder1, Holder2
}
private ConcurrentLinkedQueue<MyFutureTask<?>> tasksQ;
private final Thread thread;
private final EType eType;
public MyExecutor(EType eType) {
eType = Objects.requireNonNull(eType);
tasksQ = new ConcurrentLinkedQueue<>();
thread = new Thread(new MyRunnable());
thread.start();
}
public <T> Future<T> submit(Callable<T> c) {
MyFutureTask<T> task = new MyFutureTask<T>(c, eType);
tasksQ.add(task);
return task;
}
class MyRunnable implements Runnable {
@Override
public void run() {
while (true) {
if (tasksQ.isEmpty()) {
try {
Thread.sleep(1);
continue;
} catch (InterruptedException ite) {
Thread.interrupted();
break;
}
}
MyFutureTask<?> task = tasksQ.poll();
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class MyFutureTask<T> implements RunnableFuture<T> {
final Callable<?> cb;
volatile Object outcome;
static final int STATE_PENDING = 1;
static final int STATE_EXECUTING = 2;
static final int STATE_DONE = 3;
final AtomicInteger atomicState = new AtomicInteger(STATE_PENDING);
final EType eType;
public MyFutureTask(Callable<?> cb, EType eType) {
cb = Objects.requireNonNull(cb);
eType = Objects.requireNonNull(eType);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new NotImplementedException();
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return atomicState.get() == STATE_DONE;
}
@SuppressWarnings("unchecked")
@Override
public T get() throws InterruptedException, ExecutionException {
while (true) {
switch (atomicState.get()) {
case STATE_PENDING:
case STATE_EXECUTING:
// Thread.sleep(1);
break;
case STATE_DONE:
return (T)outcome;
default:
throw new IllegalStateException();
}
}
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new NotImplementedException();
}
void set(T t) {
outcome = t;
}
@Override
public void run() {
if (atomicState.compareAndSet(STATE_PENDING, STATE_EXECUTING)) {
Object result;
try {
switch (eType) {
case NoHolder:
result = cb.call();
break;
case Holder1:
throw new NotImplementedException();
case Holder2:
throw new NotImplementedException();
default:
throw new IllegalStateException();
}
} catch (Exception e) {
e.printStackTrace();
result = null;
}
outcome = result;
atomicState.set(STATE_DONE);
}
}
}
}
class MyTask implements Callable<List<Integer>> {
@Override
public List<Integer> call() throws Exception {
List<Integer> ret = new ArrayList<>(100);
IntStream.range(0, 100).boxed().forEach(ret::add);
return ret;
}
}