Потоки ReentrantLock заканчиваются случайным образом - PullRequest
2 голосов
/ 06 апреля 2020

Я работал над школьным заданием, посвященным многопоточности Java. Одна из задач, над которыми я застрял, заключается в том, что нам нужно создать несколько потоков в разных группах, и, если в каждой группе по 4 потока, только тогда они могут быть освобождены для работы в унисон, в противном случае их необходимо приостановить. / в режиме ожидания. Например:

  • Поток a, b, c присоединяется к группе 7, все они удерживаются / ожидают.
  • Поток d присоединяется к группе 7, все четыре потока (a , b, c, d) сигнализируются о прекращении.
  • Поток e, f, g, h, i присоединяется к группе 8, в этом случае e, f, g, h будет сигнализироваться как прервано, пока поток i находится в ожидании.
  • Поток j присоединяется к группе 7, он включается для ожидания.

Это общая задача, с которой я справился. Задача, над которой я работаю, требует, чтобы мы выпустили ПЕРВОНАЧАЛЬНЫЕ первые 4 потока группы, а остальные должны подождать, пока 4 из предыдущих потоков не вызовут Finished ().

Например, 3 потока присоединяются к группе 65 они ждут. Другой поток присоединяется к группе 65, и все 4 потока освобождаются вместе. Сейчас 4 темы работают (прекращено). Теперь потоки e, f, g, h, i, j, k, l присоединяются к группе 65. Все они должны ждать, пока e, f, g, h не вызовут метод finish ().

Здесь это то, что я сделал до сих пор:

ExtrinsicSyn c. java:

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;

public class ExtrinsicSync {

    private HashMap<Integer, ConditionWrapper> groupThreadCount;
    private ReentrantLock monitor;
    private int count = 0;

    ExtrinsicSync() {
        groupThreadCount = new HashMap<>();
        monitor = new ReentrantLock();
    }

@Override
public void waitForThreadsInGroup(int groupId) {
    monitor.lock();

    if (!groupThreadCount.containsKey(groupId))
        groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));

    ConditionWrapper condWrapper = groupThreadCount.get(groupId);
    condWrapper.setValue(condWrapper.getValue() + 1);

    if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
    {
        condWrapper.getCondition().signalAll();
        condWrapper.setInitialStatus(false);

        System.out.println("Terminating group: " + groupId + "FROM INITIAL STATE: " + ++count);
    } else {
        System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
        try { condWrapper.getCondition().await(); }
        catch (InterruptedException e) { e.printStackTrace(); }

    }

    monitor.unlock();
}

@Override
public void finished(int groupId) {
    monitor.lock();
    ConditionWrapper condWrapper = groupThreadCount.get(groupId);

    if(!condWrapper.getInitialStatus())
    {
        condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
        System.out.println("Group: " + groupId + "FINISHED COUNT: " + condWrapper.getFinishedCount());
        if(condWrapper.getFinishedCount() == 4)
        {
            condWrapper.setFinishedCount(0);
            condWrapper.getCondition().signalAll();
            System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
        }
    }
    monitor.unlock();
}

ExtrinsicSyncTest. java:

import org.junit.Test;

import java.util.EnumMap;

class TestTask1 implements Runnable{

    final int group;
    final ExtrinsicSync s1;

    TestTask1(int group, ExtrinsicSync s1)
    {
        this.group = group;
        this.s1 = s1;
    }

    public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}

public class ExtrinsicSyncTest {

    @Test
    public void testPhaseThreethreads() {

        int nThreads = 22;

        Thread t[] = new Thread[nThreads];
        final ExtrinsicSync s1 = new ExtrinsicSync();

        for(int i = 0; i < nThreads/2; i++)
            (t[i] = new Thread(new TestTask1(66, s1))).start();

        for(int i = nThreads/2; i < nThreads; i++)
            (t[i] = new Thread(new TestTask1(70, s1))).start();

        for (Thread ti : t)
        {
            try { ti.join(100); }
            catch (Exception e) { System.out.println(e); }
        }

        EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);

        for (Thread.State s : Thread.State.values())
            threadsInThisState.put(s, 0);

        for (Thread ti : t)
        {
            Thread.State state = ti.getState();
            int n = threadsInThisState.get(state);
            threadsInThisState.put(state, n + 1);
        }

        System.out.println("threadsInThisState: " + threadsInThisState.toString() );

    }
}

ConditionWrapper. java:

import java.util.concurrent.locks.Condition;

public class ConditionWrapper {
    private Condition cond;
    private Integer value;
    private Integer finishedCount;
    private boolean initialThreads;

    public ConditionWrapper(Condition condition)
    {
        this.cond = condition;
        this.value = 0;
        this.finishedCount = 0;
        this.initialThreads = true;
    }
    // Returns the condition object of current request
    public Condition getCondition()
    {
        return this.cond;
    }
    // Gets the current counter of threads waiting in this queue.
    public Integer getValue()
    {
        return this.value;
    }
    // Sets the given value. Used for resetting the counter.
    public void setValue(int value) { this.value = value; }
    // Sets the counter to help keep track of threads which called finished() method
    public void setFinishedCount(int count) { this.finishedCount = count; }
    // Gets the finished count.
    public Integer getFinishedCount() { return this.finishedCount; }
    // This flag is to identify initial threads of a group
    public boolean getInitialStatus() { return initialThreads; }
    public void setInitialStatus(boolean val) { this.initialThreads = val; }
}

Проблема, с которой я столкнулся, заключается в том, что я могу освободить первые четыре потока каждой группы, но так или иначе, где-то 2 потока заканчиваются случайным образом, и я не могу понять, что происходит. Например, если описанный выше 22 тестового потока разделен на две группы, только 8 потоков должны быть завершены, в то время как остальные ожидают.

Но здесь вместо 10 потоков завершается. Я не понимаю, что происходит. Я сократил код до минимума как можно лучше.

1 Ответ

2 голосов
/ 08 апреля 2020

Проблема в том, что для не исходных потоков (getInitialStatus == false) вы не сигнализируете другим потокам, но все равно прекращаете их, когда достигли четырех из них. Вот что происходит:

  1. первые три потока увеличивают количество и ждут
  2. четвертый поток достигает счетчика == 4 и устанавливает initial = false и сигнализирует все остальные потоки и устанавливает счетчик к нулю
  3. следующие три потока увеличивают счет на один
  4. , 8 потоков достигает счетчика == 4 и завершается. Поскольку getInitialStatus == false, этот поток не уведомляет другие потоки.

, поэтому 4 * 2 потока + 2 потока завершаются. Именно то количество, которое вы видели в своих тестах.


Вот потенциальный способ реализовать это:

  1. использовать флаг canExecute в каждом потоке или задаче
  2. используйте метод Calculate для вычисления текущего состояния и установите флаг в значение true, если потоку разрешено выполняться.
  3. хранит все потоки, которые ожидают в списке или что-то подобное

Таким образом, ваша задача будет выглядеть следующим образом:

Task
  boolean canExeute

Затем метод waitForThreadsInGroup будет выглядеть следующим образом:

waitForThreadsInGroup
  monitor.lock();
      add task to list
      calculateTaskState
      condition.notifyAll
      while( ! task.canExcecute )
      {
        condition.await.
      }

  monitor.unlock();

Метод fini sh выглядит примерно так:

  finish
    monitor.lock();
    decrement finish count
    calculateTaskState
   condition.notifyAll
   monitor.unlock();

И рассчитатьTaskState

calculateTaskState
  if( finishCount == 0)
  {
      if( taskList.size >= 4  )
      {
         set 4 tasks in this list to can execute and remove them from the list
      }
  }

Таким образом, хитрость заключается в том, чтобы разделить логи c на три шага:

  1. действие, например, уменьшив число финалов sh
  2. расчет нового состояния. И решить для каждого потока, разрешено ли выполнять
  3. И ожидание потоков. Каждый поток должен ждать своего собственного флага
...