Может ли быть нарушена безопасность потока нижеуказанного класса; Я уверен, что это не может быть, но просто хотел быть уверен вдвойне, поскольку это не легко проверить - PullRequest
2 голосов
/ 07 июня 2011
public class ThreadSafe implements ITaskCompletionListener {

private final Set<String> taskIds = new HashSet<String>();
private final Set<String> successfulIds = new HashSet<String>();
private final Set<String> cancelledIds = new HashSet<String>();
private final Set<String> errorIds = new HashSet<String>();

public ThreadSafe() {

}

// invoked concurrently
@Override
public void onCancel(String pTaskId) {
    remove(pTaskId);
    cancelledIds.add(pTaskId);
}

// invoked concurrently
@Override
public void onError(String pTaskId) {
    remove(pTaskId);
    errorIds.add(pTaskId);
}

// invoked concurrently
@Override
public void onSuccess(String pTaskId) {
    remove(pTaskId);
    successfulIds.add(pTaskId);
}

private void remove(String pTaskId) {
    taskIds.remove(pTaskId);
}

}

Ответы [ 7 ]

8 голосов
/ 07 июня 2011

Из документов HashSet:

Обратите внимание, что эта реализация не синхронизирована .Если несколько потоков обращаются к хэш-набору одновременно, и хотя бы один из потоков изменяет набор, он должен быть синхронизирован извне

Так что нет, ваш код не является потокобезопасным,Одновременный доступ к любому из ваших методов может привести к странным результатам.

4 голосов
/ 08 июня 2011

Вместо того, чтобы иметь множество наборов и передавать идентификаторы между коллекциями, вы можете использовать одну коллекцию, которая является поточно-ориентированной.

private final ConcurrentMap<String, State> idState = new ConcurrentHashMap<String, State>();
enum State { TASK, SUCCESS, CANCELLED, ERROR }

public void onSuccess(String taskId) {
    idState.put(taskId, State.SUCCESS);
}

public void onCancelled(String taskId) {
    idState.put(taskId, State.CANCELLED);
}

public void onError(String taskId) {
    idState.put(taskId, State.ERROR);
}

public void remove(String taskId) {
    idState.remove(taskId);
}
1 голос
/ 08 июня 2011

Этот код полон проблем с безопасностью потоков.

HashSet не является поточно-ориентированным (он основан на не-поточно-безопасном HashMap - условиях гонки, в которых могут возникнуть бесконечные циклы ).

Во-вторых, не существует атомарности между идентификатором, находящимся в наборе taskIds, и его добавлением к одному из других, поэтому существование задачи эфемерно.

В-третьих, код неявно предполагает, что состояние задачи просто inProgress -> success | error | cancel и что одновременное выполнение задачи не выполняется. Если это не так, то код завершается ошибкой.

1 голос
/ 08 июня 2011

Питер прав. «Вместо того, чтобы иметь множество наборов и передавать идентификаторы между коллекциями, вы можете использовать одну коллекцию, которая является поточно-ориентированной».

Это сделало бы его немного менее сложным и выглядело бы нелегко сломаться.

1 голос
/ 07 июня 2011

Методы onError, onSuccess и onCancel могут быть вызваны параллельно двумя или более потоками, которые могут в конечном итоге вызывать taskIds.remove параллельно, что НЕ является потокобезопасным.

Просто пометьте эти три метода как synchronized, и все готово.

0 голосов
/ 08 июня 2011

Спасибо каждому из вас за то, что вы ответили, а также за то, что указали на очевидные ошибки в коде, который я вставил.Я дам контекст проблемы, которую я решаю.

Я буду получать запросы (каждый запрос будет иметь набор идентификаторов задач);У меня есть пул потоков, который будет использоваться для отправки этого запроса (пакет задач для каждого запроса).Я буду принимать от клиентов слушателей, которые будут вызваны после завершения задачи.В текущем контексте я мог бы использовать ExecutorCompletionService для ожидания выполнения всех задач, соответствующих запросу, однако я не хочу делать какие-либо блокирующие вызовы.

Вместо этого я хочу создать подкласс FutureTask (переопределить done() метод), и это будет реализовано в newTaskFor() моей службы исполнителя подклассов.Все будущие задачи, соответствующие одному запросу, будут иметь общий TaskCompletionListener, а в методе переопределения done() вызовет один из success(), error() или отменится.TaskCompletionListener будет инициализирован с начальным набором идентификатора задачи (количество задач) и для каждого вызова будет уменьшать счет на единицу, после чего будет сообщено о завершении.

Вот измененная версиякод, я надеюсь, что на этот раз я не совершил много ошибок (и да, еще раз спасибо за все ответы в прошлый раз).

public class TaskCompletionListener implements ITaskCompletionListener {

private final AtomicInteger taskCount;

private final IProcessCompletionListener completionListener;

public TaskCompletionListener(final HashSet<String> pTaskIds, IProcessCompletionListener pCompletionListener) {
    taskCount = new AtomicInteger(pTaskIds.size());
    completionListener = pCompletionListener;
}

// invoked concurrently
@Override
public void onCancel(String pTaskId) {
    checkCompletion();
}

// invoked concurrently
@Override
public void onError(String pTaskId) {
    checkCompletion();
}

// invoked concurrently
@Override
public void onSuccess(String pTaskId) {
    checkCompletion();
}

private void checkCompletion() {
              // method level confinement  
    int currentCount = taskCount.decrementAndGet();
    if (currentCount == 0) {
        completionListener.onProcessComplete();
    }
}

interface IProcessCompletionListener {
    void onProcessComplete();
}
}
0 голосов
/ 07 июня 2011

Я думаю, вы должны использовать блок синхронизации в методах, которые вызываются одновременно.

пример

public void onCancel(String pTaskId) {
synchronized(this){
    remove(pTaskId);
    cancelledIds.add(pTaskId);
}
}
...