Стратегия «Процессы» или «Потоки» для заполнения независимых блоков в глобальный массив? - PullRequest
0 голосов
/ 14 февраля 2019

В python2 я хотел бы заполнить глобальный массив, заполнив параллельными процессами (или потоками) различные под-массивы (всего 16 блоков) .Я должен уточнить, что каждый блок не зависит от других, я имею в виду, когда я выполняю назначение каждой ячейки текущего блока.

1) Из того, что я нашел, я получил бы большую выгоду отмногоядерные процессоры с использованием различных «processes», но кажется немного сложным разделить глобальный массив всеми другими процессами.

2) С другой точки зрения, я могу использовать «threads "вместо" processes ", поскольку реализация менее сложна.Я обнаружил, что библиотека "ThreadPool" из "multiprocessing.dummy" позволяет совместно использовать этот глобальный массив всем другим параллельным потокам.

Например, для python2.7 работает следующий код:

from multiprocessing.dummy import Pool as ThreadPool

## discretization along x-axis and y-axis for each block
arrayCross_k = np.linspace(kMIN, kMAX, dimPoints)
arrayCross_mu = np.linspace(-1, 1, dimPoints)
# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))
dimBlocks = 4
# Size of dimension along k and mu axis
dimPoints = 100
# dimension along one dimension of global arrayFullCross
dimMatCovCross = dimBlocks*dimPoints

# Build cross-correlation matrix 
def buildCrossMatrix_loop(params_array):
  # rows indices
  xb = params_array[0]
  # columns indices
  yb = params_array[1]
  # Current redshift
  z = zrange[params_array[2]]
  # Loop inside block
  for ub in range(dimPoints):
    for vb in range(dimPoints):
      # Diagonal blocs 
      if (xb == yb):
      # Fill the (xb,yb) su-block of global array by 
        arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])),

        ...
        ...

# End of function buildCrossMatrix_loop

# Main loop
while i < len(zrange):

  def generatorCrossMatrix(index):
    for igen in range(dimBlocks):
      for lgen in range(dimBlocks):
        yield igen, lgen, index

if __name__ == '__main__':

  # Use 20 threads
  pool = ThreadPool(20)
  pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))

  # Increment index "i"
  i = i+1

Но, к сожалению, даже используя 20 потоков, я понимаю, что ядра моего процессора не работают полностью (на самом деле, с помощью команды top или htop, я вижу только один процесс на 100%).

3) Какую стратегию я должен выбрать, если я хочу в полной мере использовать 16 ядер моего процессора (как это имеет место в pool.map(function, generator)) but with also the sharing of global array?

4)некоторые люди говорили мне делать ввод / вывод для каждого подмассива (в основном, записывать каждый блок в файл и собирать все подмассивы, читая их и заполняя полный массив).Это решение удобно, но я бы хотел избежать ввода-вывода (если только нет других решений).

5) Я практиковал MPI library с C language и операцию заполнения подмассива инаконец собрать их, чтобы построить большой массив, не очень сложно.Однако я не хотел бы использовать MPI с языком Python (даже не знаю, существует ли он).

6) Я также пытался использовать Process с целью, равной моей функции заполнения(buildCrossMatrix_loop), например, в while Основной цикл выше:

from multiprocessing import Process

# Main loop on z range
while i < len(zrange):

  params_p = []
  for ip in range(4):
    for jp in range(4):
      params_p.append(ip)
      params_p.append(jp)
      params_p.append(i)
      p = Process(target=buildCrossMatrix_loop, args=(params_p,))
      params_p = []
      p.start()

  # Finished : wait everybody
  p.join()

  ...
  ...

  i = i+1
  # End of main while loop

Но конечный 2D глобальный массив заполнен только нулями.Поэтому я должен сделать вывод, что функция Process не разделяет массив для заполнения?

7) Так какую стратегию мне нужно искать?:

1.Использование «процессов пула» и поиск способа поделиться глобальным массивом, зная, что все мои 16-ядерные процессоры будут работать

2.Использование «потоков» и общего глобального массива, но производительность, на первый взгляд, кажется менее хорошей, чем с «процессами пула».Может быть, есть способ увеличить мощность каждого «Потока», я имею в виду, например, «пул процессов»?

Я пытался следовать различным примерам на https://docs.python.org/2/library/multiprocessing.html, но безуспешноТо есть без соответствующих характеристик с точки зрения ускорения.

Я думаю, что в моем случае основной проблемой является сбор всех подмассивов ИЛИ тот факт, чтоГлобальный массив arrayFullCross не используется другими процессами или потоками.

Если бы у кого-то был простой пример совместного использования глобальной переменной в многопоточном контексте (здесь это массив), это было бы неплохочтобы поставить его здесь.

ОБНОВЛЕНИЕ 1: Я сделал тест с Threading (а не multiprocessing), но производительность остается довольно плохой.GIL, по-видимому, не разблокирован, т. Е. В команде htop появляется только один процесс (возможно, версия библиотеки Threading не подходит).

Поэтому я попытаюсь решить мою проблему с помощью команды " метод возврата".

Наивно я пытался вернуть весь массив в конце функции, к которой я применяю функцию map, например:

# Build cross-correlation matrix 
def buildCrossMatrix_loop(params_array):

  # rows indices
  xb = params_array[0]
  # columns indices
  yb = params_array[1]
  # Current redshift
  z = zrange[params_array[2]]
  # Loop inside block
  for ub in range(dimPoints):
    for vb in range(dimPoints):
      # Diagonal blocs 
      if (xb == yb):         
        arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb])

      ... 
      ... #others assignments on arrayFullCross elements

  # Return global array to main process
  return arrayFullCross

Затем я попытался получить этот глобальный массив из map следующим образом:

if __name__ == '__main__':

  pool = Pool(16)
  outputArray = pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
  pool.terminate()
  ## Print outputArray
  print 'outputArray = ', outputArray

  ## Reshape 4D outputArray to 2D array
  arrayFullCross2D_swap = np.array(outputArray).swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)

К сожалению, когда я печатаю outputArray, я получаю:

outputArray =  [None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]

Это не ожидаемый выходной массив 4D, просто список из 16 значений Нет (я думаю, что число 16 соответствует числу процессов, предоставляемых generatorCrossMatrix(i)).

Как я могу вернуть весь массив 4D один раз?map запускается и когда он закончится?

1 Ответ

0 голосов
/ 15 февраля 2019

Прежде всего, я считаю, multiprocessing.ThreadPool - это частный API, поэтому его следует избегать.Теперь multiprocessing.dummy - бесполезный модуль.Он не выполняет многопоточность / обработку, поэтому вы не видите никакой выгоды.Вы должны использовать «обычный» модуль multiprocessing.

Второй код не работает, потому что он использует несколько процессов .Процессы не разделяют память, поэтому изменения, которые вы делаете в подпроцессе, не отражаются в других подпроцессах или основном процессе.Вы либо хотите:

  • Вернуть значение и объединить их вместе в основном процессе, например, используя multiprocessing.Pool.map
  • Использовать threading вместо multiprocessing: just replace Импорт многопроцессорной обработки with импорт потоков and мультипроцесс1027 * освобождает GIL во время вычислений, в противном случае он зависнет на 1 ЦП.

    Возможно, вы захотите посмотреть этот похожий вопрос , на который я ответил паруминут назад.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...