Реализация пула потоков с использованием GCD - PullRequest
1 голос
/ 04 октября 2019

У меня есть большой цикл с вычислительными задачами, которые можно распараллелить. Для этой цели я решил написать простой пул параллельных потоков, используя GCD, так как я работаю на iOS.

Мой пул потоков выглядит довольно простым. Я приложу только .m файл, этого будет достаточно, чтобы понять мою идею:

#import "iOSThreadPool.h"

@interface iOSThreadPool()
{
    int                                     _timeout;
    int                                     _currentThreadId;
    NSMutableArray<dispatch_queue_t>        *_pool;
    NSMutableArray<dispatch_semaphore_t>    *_semaphores;
    dispatch_group_t                        _group;
}

@end

@implementation iOSThreadPool

- (instancetype)initWithSize:(int)threadsCount tasksCount:(int)tasksCount
{
    self = [super init];
    if (self) {
        _timeout = 2.0;
        _currentThreadId = 0;
        _pool = [NSMutableArray new];
        _semaphores = [NSMutableArray new];
        for (int i = 0; i < threadsCount; i++) {
            dispatch_queue_attr_t attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_CONCURRENT, QOS_CLASS_BACKGROUND, 0);
            dispatch_queue_t queue = dispatch_queue_create([NSString stringWithFormat:@"com.workerQueue_%d", i].UTF8String, attr);
            [_pool addObject:queue];

            dispatch_semaphore_t sema = dispatch_semaphore_create(tasksCount);
            [_semaphores addObject:sema];
        }

        _group = dispatch_group_create();
    }

    return self;
}

- (void)async:(iOSThreadPoolBlock)block
{
    dispatch_group_enter(self->_group);

    __block dispatch_semaphore_t sema = _semaphores[_currentThreadId];
    dispatch_async(_pool[_currentThreadId], ^{

        dispatch_semaphore_wait(sema, dispatch_time(DISPATCH_TIME_NOW, (int64_t)(self->_timeout * NSEC_PER_SEC)));
        block();
        dispatch_semaphore_signal(sema);

        dispatch_group_leave(self->_group);
    });

    _currentThreadId = (_currentThreadId + 1) % _pool.count;
}

- (void)wait {
    dispatch_group_wait(_group, dispatch_time(DISPATCH_TIME_NOW, (int64_t)(self->_timeout * NSEC_PER_SEC)));
}

@end

Итак, в основном, когда я создаю пул потоков, я устанавливаю значения потоков и значения семафоров. Поскольку очереди параллельны, я хочу ограничить количество задач, которые могут выполняться одновременно, чтобы поток не перегружался.

Дело в том, что независимо от того, сколько потоков я создаю, это не влияет на производительность ввсе. Я полагаю, это происходит потому, что все задачи очереди отправки попадают в глобальную очередь, и независимо от того, сколько у меня очередей, все они большую часть времени отправляют свои задачи в одну и ту же очередь BACKGROUND.

I 'Я много читал о GCD и успешно использовал его в своей практике. Но когда я просто хочу пойти дальше простого использования, которое вы можете найти в бесчисленных уроках, например, выполнить несколько распараллеленных процессов с намерением сэкономить как можно больше времени выполнения - я терплю неудачу. И я искал более подробное объяснение или более подробные эффективные методы для GCD, я ничего не нашел. Похоже, 90% времени он используется очень просто. И в то же время я слышу, что GCD является очень очень мощным многопоточным фреймворком, поэтому ясно, что я просто не знаю, как правильно его использовать.

Так что мой вопрос - действительно ли это возможно запуститьнесколько параллельных процессов на iOS? Что я должен изменить в своем пуле потоков, чтобы сделать его более эффективным?

ПРИМЕЧАНИЕ. Я загрузил версию C ++ ThreadPool, основанную на std::thread. И если я изменю количество потоков в этом пуле, я ясно вижу снижение производительности. Я был бы очень признателен, если бы какой-нибудь GCD гуру смог указать мне, как использовать GCD на максимальной мощности.

1 Ответ

3 голосов
/ 17 октября 2019

GCD уже выполняет пул потоков (очереди отправки опираются на пул «рабочих потоков»), поэтому избыточно / неэффективно добавлять поверх этого еще один слой пула.

Вы говорите:

Дело в том, что независимо от того, сколько потоков я создаю, это никак не влияет на производительность.

Это может быть что угодно. Одна из распространенных проблем заключается в том, что единица работы слишком мала. * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * Вы должны удостовериться, что код вашей задачи выполняет разумную работу на каждой итерации. Как и в случае с любым блоком или функцией, которую вы отправляете в очередь, планирование этого кода для выполнения требует дополнительных затрат. Если каждая итерация вашего цикла выполняет только небольшой объем работы, накладные расходы по планированию кода могут перевесить выигрыш в производительности, который вы могли бы получить, отправляя его в очередь.

Но есть множестводругие проблемы, начиная от неэффективного кода синхронизации, сбоя кэша и т. д. Без воспроизводимого примера проблемы невозможно сказать. Хотя QoS также оказывает влияние, оно зачастую незначительно по сравнению с этими алгоритмическими проблемами.

Вы говорите:

Поскольку очереди параллельны, я хочу ограничить количество задач, которые могут быть выполненыодновременно, так что поток не будет перегружен.

Хотя вы можете достичь этого либо с ненулевыми семафорами диспетчеризации, либо с NSOperationQueue с некоторыми maxConcurrentOperationCount, dispatch_apply (известное как concurrentPerform для пользователей Swift) - это решение «go to» для распараллеленных подпрограмм с интенсивными вычислениями, которые балансируют рабочие нагрузки между ядрами ЦП. Он автоматически просматривает, сколько ядер у вас есть, и распределяет цикл между ними, не рискуя взорваться потоками. И, как указано в Улучшение цикла Loop Code , вы можете экспериментировать с шагами, которые хорошо справляются с балансировкой объема работы, выполняемой в каждом потоке, с внутренними издержками координации потоков. (Удаление также может минимизировать конкуренцию в кеше.)

Я мог бы предложить исследовать dispatch_apply и попробовать. Если вам все еще неясно в этот момент, просто опубликуйте новый вопрос, который показывает как непараллельную подпрограмму, так и параллельное представление, и мы можем помочь вам в дальнейшем.


Как я уже говорил выше,Я не думаю, что вы хотите эту рутину вообще. Для интенсивных вычислений я бы предпочел dispatch_apply. Для простых очередей, для которых я хотел бы контролировать степень параллелизма (особенно если некоторые из этих задач сами асинхронны), я бы использовал NSOperationQueue с maxConcurrentOperationCount. Но я подумал, что поделюсь несколькими наблюдениями о вашем фрагменте кода:

  • То, что вы реализовали, это пул очередей, а не пул потоков;

  • То, что вы называете threadsCount это не количество потоков, а количество очередей. Итак, если вы создаете пул со счетом 10 и tasksCount из 20, это означает, что вы потенциально используете 200 потоков.

  • Аналогично тому, что вы называете _currentThreadId не текущая тема. Это текущая очередь.

  • Взаимодействие с _currentThreadId не является поточно-ориентированным.

Итог, GCD имеет свой собственный пултемы, так что вы не должны воспроизводить эту логику. Все, что вам нужно сделать, это реализовать логику «не более threadCount» (что может быть достигнуто с помощью семафора с ненулевой диспетчеризацией). Таким образом, я бы предложил упростить это до следующего вида:

@interface ThreadPool()
@property (nonatomic, strong) dispatch_queue_t pool;
@property (nonatomic, strong) dispatch_queue_t scheduler;
@property (nonatomic, strong) dispatch_semaphore_t semaphore;
@end

@implementation ThreadPool

- (instancetype)initWithThreadCount:(int)threadCount {
    self = [super init];
    if (self) {
        NSString *identifier = [[NSUUID UUID] UUIDString];
        NSString *bundleIdentifier = [[NSBundle mainBundle] bundleIdentifier];

        NSString *schedulingLabel = [NSString stringWithFormat:@"%@.scheduler.%@", bundleIdentifier, identifier];
        _scheduler = dispatch_queue_create(schedulingLabel.UTF8String, DISPATCH_QUEUE_SERIAL);

        NSString *poolLabel = [NSString stringWithFormat:@"%@.pool.%@", bundleIdentifier, identifier];

        dispatch_queue_attr_t attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_CONCURRENT, QOS_CLASS_BACKGROUND, 0);
        _pool = dispatch_queue_create(poolLabel.UTF8String, attr);

        _semaphore = dispatch_semaphore_create(threadCount);
    }

    return self;
}

- (void)async:(ThreadPoolBlock)block {
    dispatch_async(self.scheduler, ^{
        dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER);
        dispatch_async(self.pool, ^{
            block();
            dispatch_semaphore_signal(self.semaphore);
        });
    });
}

@end

Нет необходимости говорить, что эта реализация, как и ваша, предполагает, что блок, переданный методу async, сам по себе является синхронным (например, это не так). запуск еще одного асинхронного процесса, такого как сетевой запрос или что-то еще). Я подозреваю, что вы это знаете, но я упоминаю это только для полноты.

...