Я работал над школьным заданием, посвященным многопоточности Java. Одна из задач, над которыми я застрял, заключается в том, что нам нужно создать несколько потоков в разных группах, и, если в каждой группе по 4 потока, только тогда они могут быть освобождены для работы в унисон, в противном случае их необходимо приостановить. / в режиме ожидания. Например:
- Поток a, b, c присоединяется к группе 7, все они удерживаются / ожидают.
- Поток d присоединяется к группе 7, все четыре потока (a , b, c, d) сигнализируются о прекращении.
- Поток e, f, g, h, i присоединяется к группе 8, в этом случае e, f, g, h будет сигнализироваться как прервано, пока поток i находится в ожидании.
- Поток j присоединяется к группе 7, он включается для ожидания.
Это общая задача, с которой я справился. Задача, над которой я работаю, требует, чтобы мы выпустили ПЕРВОНАЧАЛЬНЫЕ первые 4 потока группы, а остальные должны подождать, пока 4 из предыдущих потоков не вызовут Finished ().
Например, 3 потока присоединяются к группе 65 они ждут. Другой поток присоединяется к группе 65, и все 4 потока освобождаются вместе. Сейчас 4 темы работают (прекращено). Теперь потоки e, f, g, h, i, j, k, l присоединяются к группе 65. Все они должны ждать, пока e, f, g, h не вызовут метод finish ().
Здесь это то, что я сделал до сих пор:
ExtrinsicSyn c. java:
import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;
public class ExtrinsicSync {
private HashMap<Integer, ConditionWrapper> groupThreadCount;
private ReentrantLock monitor;
private int count = 0;
ExtrinsicSync() {
groupThreadCount = new HashMap<>();
monitor = new ReentrantLock();
}
@Override
public void waitForThreadsInGroup(int groupId) {
monitor.lock();
if (!groupThreadCount.containsKey(groupId))
groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));
ConditionWrapper condWrapper = groupThreadCount.get(groupId);
condWrapper.setValue(condWrapper.getValue() + 1);
if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
{
condWrapper.getCondition().signalAll();
condWrapper.setInitialStatus(false);
System.out.println("Terminating group: " + groupId + "FROM INITIAL STATE: " + ++count);
} else {
System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
try { condWrapper.getCondition().await(); }
catch (InterruptedException e) { e.printStackTrace(); }
}
monitor.unlock();
}
@Override
public void finished(int groupId) {
monitor.lock();
ConditionWrapper condWrapper = groupThreadCount.get(groupId);
if(!condWrapper.getInitialStatus())
{
condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
System.out.println("Group: " + groupId + "FINISHED COUNT: " + condWrapper.getFinishedCount());
if(condWrapper.getFinishedCount() == 4)
{
condWrapper.setFinishedCount(0);
condWrapper.getCondition().signalAll();
System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
}
}
monitor.unlock();
}
ExtrinsicSyncTest. java:
import org.junit.Test;
import java.util.EnumMap;
class TestTask1 implements Runnable{
final int group;
final ExtrinsicSync s1;
TestTask1(int group, ExtrinsicSync s1)
{
this.group = group;
this.s1 = s1;
}
public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}
public class ExtrinsicSyncTest {
@Test
public void testPhaseThreethreads() {
int nThreads = 22;
Thread t[] = new Thread[nThreads];
final ExtrinsicSync s1 = new ExtrinsicSync();
for(int i = 0; i < nThreads/2; i++)
(t[i] = new Thread(new TestTask1(66, s1))).start();
for(int i = nThreads/2; i < nThreads; i++)
(t[i] = new Thread(new TestTask1(70, s1))).start();
for (Thread ti : t)
{
try { ti.join(100); }
catch (Exception e) { System.out.println(e); }
}
EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);
for (Thread.State s : Thread.State.values())
threadsInThisState.put(s, 0);
for (Thread ti : t)
{
Thread.State state = ti.getState();
int n = threadsInThisState.get(state);
threadsInThisState.put(state, n + 1);
}
System.out.println("threadsInThisState: " + threadsInThisState.toString() );
}
}
ConditionWrapper. java:
import java.util.concurrent.locks.Condition;
public class ConditionWrapper {
private Condition cond;
private Integer value;
private Integer finishedCount;
private boolean initialThreads;
public ConditionWrapper(Condition condition)
{
this.cond = condition;
this.value = 0;
this.finishedCount = 0;
this.initialThreads = true;
}
// Returns the condition object of current request
public Condition getCondition()
{
return this.cond;
}
// Gets the current counter of threads waiting in this queue.
public Integer getValue()
{
return this.value;
}
// Sets the given value. Used for resetting the counter.
public void setValue(int value) { this.value = value; }
// Sets the counter to help keep track of threads which called finished() method
public void setFinishedCount(int count) { this.finishedCount = count; }
// Gets the finished count.
public Integer getFinishedCount() { return this.finishedCount; }
// This flag is to identify initial threads of a group
public boolean getInitialStatus() { return initialThreads; }
public void setInitialStatus(boolean val) { this.initialThreads = val; }
}
Проблема, с которой я столкнулся, заключается в том, что я могу освободить первые четыре потока каждой группы, но так или иначе, где-то 2 потока заканчиваются случайным образом, и я не могу понять, что происходит. Например, если описанный выше 22 тестового потока разделен на две группы, только 8 потоков должны быть завершены, в то время как остальные ожидают.
Но здесь вместо 10 потоков завершается. Я не понимаю, что происходит. Я сократил код до минимума как можно лучше.