Я сделал свою маленькую поточно-ориентированную (подтвержденную многими тестами) библиотеку, подобную Looper и Handler в Android для Java с некоторыми другими расширениями. Все отлично отлично и работает довольно хорошо (проверено профилированием). Что он делает, так это то, что у каждого потока есть своя очередь, а у очереди есть дескриптор. Используя этот обработчик, каждый другой поток может поместить в очередь работоспособный или вызываемый.
Итак, просто чтобы проверить, как он работает под большой нагрузкой, я создал небольшую программу, в которой я генерирую 1000 потоков, и каждый поток публикует 10 Runnables во всех других потоках (т. Е. 999 других потоков), а затем, наконец, каждый поток распознает запрос на отключение для всех остальных. нить. Это прекрасно работает, если я выполняю это для
Участок памяти, используемый для свободной памяти со временем
и ниже это журнал:
-8,035: [GC 170956K-> 2635K (517952K), 0,0063501 с]
-8,610: [GC 170955K-> 15507K (512832K), 0,1745043 с]
9,543: [GC 175507K-> 36673K (516928K), 0,3060842 с]
8,610: [GC 170955K-> 15507K (512832K), 0,1745043 с]
9,543: [GC 175507K-> 36673K (516928K), 0,3060842 с]
10,321: [GC 196673K-> 54009K (488896K), 0,3298292 с]
11,431: [GC 185977K-> 84189K (502912K), 0,2965050 с]
12,818: [GC 216157K-> 111017K (458752K), 0,4901188 с]
16,660: [GC 198825K-> 144073K (470528K), 0,7027064 секунды]
17,616: [GC 231881K-> 156962K (479424K), 0,3269871 с]
18,407: [GC 247490K-> 177262K (483584K), 0,2446838 с]
18,924: [GC 268206K-> 194510K (481216K), 0,2892096 с]
19,446: [GC 285454K-> 210302K (487168K), 0,3186975 с]
20,022: [GC 308734K-> 225446K (485120K), 0,2547074 с]
20,610: [GC 323878K-> 242026K (490624K), 0,2302190 с]
21.109: [GC 348010K-> 260490K (489216K), 0,2775054 с]
21,692: [GC 366474K-> 280570K (491200K), 0,3264718 с]
22,359: [GC 389114K-> 300690K (491200K), 0,3274282 секунды]
23,097: [GC 409234K-> 321958K (490944K), 0,3410703 секунды]
23,722: [GC 430374K-> 340846K (491136K), 0,3375997 с]
24,060: [Полная GC 340846K-> 303754K (491136K), 2,2736636 секунд]
26,727: [GC 412170K-> 324490K (492096K), 0,1968805 с]
27,235: [GC 434698K-> 345614K (491968K), 0,2752622 с.]
27,510: [Полная GC 345614K-> 334283K (491968K), 2,1151760 с]
29,968: [Полная GC 444491K-> 349326K (491968K), 2,5176330 секунд]
32,817: [Полная GC 459534K-> 348355K (491968K), 3,2688566 с]
36,553: [Полная GC 458563K-> 371805K (491968K), 2,3835641 с]
39,211: [Полная GC 459776K-> 395739K (491968K), 2,2324407 с]
41,654: [Полная GC 459776K-> 409135K (491968K), 2,2631054 с]
44.113: [Полная GC 459776K-> 396769K (491968K), 3,4707959 с]
47,930: [Полная GC 459775K-> 415668K (491968K), 2,9166601 с]
51,051: [Полная GC 459775K-> 425117K (491968K), 2,6670247 с]
53,886: [Полная GC 459775K-> 432457K (491968K), 2,2265421 с]
56,192: [Полная GC 459775K-> 422948K (491968K), 3,2675329 с]
59,651: [Полная GC 459775K-> 436339K (491968K), 2,3835789 с]
62,136: [Полная GC 459775K-> 441349K (491968K), 2,2442554 с]
64,433: [Полная GC 459775K-> 445241K (491968K), 2,2672156 с]
66,750: [Полная GC 459775K-> 437517K (491968K), 3,2987756 с]
70.109: [Полная GC 459776K-> 447665K (491968K), 1,9295598 с]
72.069: [Полная GC 459776K-> 449837K (491968K), 1.8525232 с]
73,966: [Полная GC 459776K-> 451969K (491968K), 1,9544717 с]
75,956: [Полная GC 459776K-> 445178K (491968K), 3,3964743 с]
и так до тех пор, пока не будет выдана ошибка.Далее я обнаружил, что, как только потоки начали завершаться, число # уменьшалось до 167, но остальные никогда не прекращались.
Не может быть никаких условий гонки, потому что он хорошо работает для <500 потоков. Теперь я знаю, что это может быть связано с голодом, но если бы причиной было голодание, то это произошло бы, когда было 1000 нитей, а не когда осталось только 150. </p>
В чем может быть причина этого?
Ниже приведен короткий фрагмент кода:
protected static <T> Future<T> execute(final Double id, Callable<T> call)
{
if(call==null)
throw new NullPointerException("Callable is null");
synchronized(id)
{
if(mapBoolean.get(id))
{
setPollTime(0);
throw new RejectedExecutionException();
}
RunnableFuture<T> ftask = new FutureTask<T>(call);
LinkedBlockingQueue<Runnable> queue = map.get(id);
queue.add(ftask);//System.out.println("added");
return ftask;
}
}
и это код, где он выполняется
public static void loop() throws InterruptedException
{
LinkedBlockingQueue<Runnable> queue = map.get(tlocal.get());
Random random = new Random();// This is used instead of Math.random() so that
//there is less contention on Math.random(). See the API Documentation of Math.random()
while(!Thread.currentThread().isInterrupted())
{
try{
//Runnable runnable = queue.take(); cannot be used, as we will have to synchronize the whole block for
//atomicity, see @link shutDown() for more info
Runnable runnable = queue.poll(pollTime, TimeUnit.MILLISECONDS);
if(runnable!=null)//if the polled object is not null, if null try again
{
runnable.run();
}
Для отключения:
synchronized(id)
{
mapBoolean.put(id, !shut);
}
PS: у меня двухъядерный компьютер с оперативной памятью 2 ГБ
PPS: выделенный размер кучи 512