Какова логика выбора потоков из пулов потоков в параллельном API Java? - PullRequest
0 голосов
/ 05 октября 2018

Параллельный API Java предлагает множество полезных библиотек и функций, которые значительно упрощают работу с асинхронными и многопоточными потоками управления.Одной из таких функций является Пулы потоков .

Ранее сегодня я экспериментировал с API параллелизма и заметил несколько странных шаблонов выбора потоков из пула потоков.Они заставили меня задуматься о том, какая логика стоит за выбором потоков.Ниже приведен пример кода, который, возможно, даст вам пример того, о чем я говорю.Вы заметите шаблон в имени потока (внутри [ ]), когда программа регистрирует каждый tick.Хотя шаблон может не отображаться в сборках JDK, отличных от Oracle JDK 1.8.0_161, на 64-разрядной машине с Windows 10.

Несмотря на это, мой вопрос имеет отношение не к какой-либо случайной структуре, а скорее к процессувыбора потока из пула потоков.Шаблон заставляет меня верить, что он не совсем случайный, так какова логика этого выбора?Благодарю.:)

public static void main(String[] args)
{
    // create a ScheduledExecutorService with a Thread Pool of 7 threads
    ScheduledExecutorService ses = Executors.newScheduledThreadPool(7);
    log("go");
    // starts a timer of 30 seconds, shutting down ses afterwards
    ses.schedule(() -> call(ses), 30, TimeUnit.SECONDS);
    // starts the ticker
    ses.schedule(() -> tick(ses, 1), 1, TimeUnit.SECONDS);
}

// ticks once per second, logging the current tick counter. (i.e, counts
// by 1 each second) ticking ends when ses is shutdown.
public static void tick(ScheduledExecutorService ses, int count)
{
    if (!ses.isShutdown())
    {
        log("tick %d", count);
        ses.schedule(() -> tick(ses, count + 1), 1, TimeUnit.SECONDS);
    }
}

// called when it's time to shutdown ses.
public static void call(ScheduledExecutorService ses)
{
    log("done");
    ses.shutdown();
}

// formats and logs the given message alongside a timestamp and the name
// of the executing thread
public static void log(String s, Object...args)
{
    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
    String time = sdf.format(new Date(System.currentTimeMillis()));
    String thread = Thread.currentThread().getName();
    String message = String.format(s, args);
    String log = String.format("%s [%s] - %s", time, thread, message);
    System.out.println(log);
}

1 Ответ

0 голосов
/ 06 октября 2018

Созданный вами ScheduledThreadPool содержит DelayQueue внутри.Когда вы отправляете задачу, она всегда создает новый рабочий поток, который будет продолжать принимать задачу из DelayQueue.

кода, когда вы отправляете задачу, новый поток продолжает принимать задачу

//getTask() method calls DelayQueue#take()
while (task != null || (task = getTask()) != null) {
       ......
       task.run();
       ......
}

метод take() возвращает задачу, если метод getDelay возвращает ноль, в противном случае рабочий поток будет ожидать окончания обратного отсчета. Когда наступит время, он разбудит рабочий поток, чтобы получить задачу.

код DelayQueue # take ()

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue.
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

когда вы отправляете 7 задач, 7 рабочих потоков ожидают своего пробуждения.поэтому выбор потока зависит от стратегии пробуждения. Как правило, он активируется в соответствии с порядком очередей ожидания.

...