Наличие локальных потоков в Cython, чтобы я мог изменить их размер? - PullRequest
1 голос
/ 11 ноября 2019

У меня есть интервально-древовидный алгоритм, который я хотел бы запустить параллельно для многих запросов с использованием потоков. Проблема в том, что тогда каждому потоку понадобится свой собственный массив, так как я не могу заранее знать, сколько будет попаданий.

Есть и другие подобные вопросы, и решение всегда состоит в том, чтобы иметь массив размера(K, t) где K - выходная длина, а t - количество потоков. Это не работает для меня, поскольку K может отличаться для каждого потока, и каждому потоку может потребоваться изменить размер массива, чтобы соответствовать всем полученным результатам.

Псевдокод:

for i in prange(len(starts)):

    qs, qe, qx = starts[i], ends[i], index[i]

    results = t.search(qs, qe)

    if len(results) + nfound < len(output):
        # add result to output
    else:
        # resize array
        # then add results

1 Ответ

2 голосов
/ 12 ноября 2019

Обычный шаблон состоит в том, что каждый поток получает свой собственный контейнер, который является компромиссом между скоростью / сложностью и накладными расходами памяти:

  1. нет необходимости блокировать доступ к этому контейнерупотому что только один поток обращается к нему.
  2. намного меньше накладных расходов по сравнению с "собственным контейнером для каждой задачи (т. Е. Каждое i -значение)".

После параллельной секции данные должны быть собраны либов конечном контейнере на этапе постобработки (что также может происходить параллельно) или в последующих алгоритмах должна быть возможность обрабатывать коллекцию контейнеров.

Вот пример использования вектора c ++ - (который уже имеет памятьвстроенное управление и увеличение размера):

%%cython -+ -c=/openmp --link-args=/openmp

from cython.parallel import prange, threadid
from libcpp.vector cimport vector
cimport openmp

def calc_in_parallel(N):    
    cdef int i,k,tid
    cdef int n = N
    cdef vector[vector[int]] vecs
    # every thread gets its own container
    vecs.resize(openmp.omp_get_max_threads())
    for i in prange(n, nogil=True):  
        tid = threadid()
        for k in range(i):
            # use container of the thread
            vecs[tid].push_back(k) # dummy for calculation

    return vecs

Использование omp_get_max_threads() для количества потоков во многих случаях переоценивает реальное количество потоков. Вероятно, более надежно установить число потоков явно в prange, т. Е.

...
NUM_THREADS = 2
vecs.resize(NUM_THREADS)
for i in prange(n, nogil=True, num_threads = NUM_THREADS): 
...

Аналогичный подход может быть применен с использованием чистого C, но потребуется больший код платформы (управление памятью) вэто дело.

...