многопроцессный или многопоточный?- распараллеливание простого вычисления за миллионы итераций и сохранение результата в единой структуре данных - PullRequest
6 голосов
/ 20 февраля 2012

У меня есть словарь D из {string: list} записей, и я вычисляю функцию f (D [s1], D [s2]) -> float для пары строк (s1, s2) в D.

Кроме того, я создал собственный класс матрицы LabeledNumericMatrix, который позволяет мне выполнять назначения, такие как m [ID1, ID2] = 1.0.

Мне нужно вычислить f (x, y) исохранить результат в m [x, y] для всех 2-х наборов в наборе строк S, в том числе, когда s1 = s2.Это легко закодировать как цикл, но выполнение этого кода занимает довольно много времени, так как размер набора S увеличивается до больших значений, таких как 10000 или более.

Ни один из результатов, которые я храню в своем помеченномМатрица m зависит друг от друга.Поэтому кажется простым распараллелить это вычисление с помощью многопоточных или многопроцессорных сервисов python.Однако, поскольку cPython действительно не позволяет my одновременно выполнять вычисление f (x, y) и хранение m [x, y] посредством потоков, кажется, что многопроцессорность - мой единственный выбор.Однако я не думаю, что многопроцессорность предназначена для передачи структур данных размером 1 ГБ между процессами, таких как моя помеченная матричная структура, содержащая 10000x10000 элементов.

Может кто-нибудь дать совет (a), если мне следует избегать попыток распараллеливаниямой алгоритм, и (б) если я могу сделать распараллеливание, как это сделать, желательно в cPython?

Ответы [ 5 ]

6 голосов
/ 20 февраля 2012

Первый вариант - Серверный процесс

Создать Серверный процесс . Это часть пакета Multiprocessing, который обеспечивает параллельный доступ к структурам данных. Таким образом, каждый процесс будет напрямую обращаться к структуре данных, блокируя другие процессы.

С документация :

Серверный процесс

Объект manager, возвращаемый Manager (), контролирует процесс сервера, который содержит объекты Python и позволяет другим процессам манипулировать ими используя прокси.

Менеджер, возвращаемый Manager (), будет поддерживать список типов, dict , Пространство имен, Блокировка , RLock, Семафор, Ограниченный семафор, Условие, Событие, Очередь, Значение и Массив.

Второй вариант - Пул рабочих

Создать Пул рабочих , Очередь ввода и Очередь результатов.

  • Основной процесс, выступающий в роли производителя, будет снабжать входную очередь парами (s1, s2).
  • Каждый рабочий процесс будет читать пару из входной очереди и записывать результат в выходную очередь.
  • Основной поток прочитает результаты из очереди результатов и запишет их в словарь результатов.

Третий вариант - разделить на независимые проблемы

Ваши данные независимы: f (D [s i ], D [s j ]) является изолированной проблемой, независимой от любого f (D [s k ], D [s l ]). более того, время вычислений каждой пары должно быть примерно одинаковым или, по крайней мере, в том же порядке.

Разделите задачу на n наборов входов, где n - это количество вычислительных единиц (ядер или даже компьютеров), которые у вас есть. Присвойте каждому входному набору отдельный процесс и присоединитесь к выходу.

2 голосов
/ 20 февраля 2012

Вы определенно не получите никакого увеличения производительности с threading - это неподходящий инструмент для задач, связанных с процессором.

Так что единственный возможный выбор - multiprocessing, но поскольку у вас большойструктуру данных, я бы предложил что-то вроде mmap (довольно низкий уровень, но встроенный) или Redis (вкусный API высокого уровня, но должен быть установлен и настроен).

1 голос
/ 20 февраля 2012

Профилировали ли вы свой код?Это просто вычисление f, которое слишком дорого, или сохранение результатов в структуре данных (или, может быть, и то и другое)?

Если f доминирующее, то вы должны убедиться, что вы не можете внести улучшения алгоритма, прежде чем начатьбеспокоиться о распараллеливании.Вы можете получить большую скорость, превратив некоторые или все функции в расширение C, возможно, используя cython .Если вы используете многопроцессорность, то я не понимаю, зачем вам передавать всю структуру данных между процессами?

Если сохранение результатов в матрице слишком дорого, вы можете ускорить свой код, используяболее эффективная структура данных (например, array.array или numpy.ndarray ).Если вы не очень тщательно разрабатываете и реализуете свой собственный класс матрицы, он почти наверняка будет медленнее, чем у них.

0 голосов
/ 27 марта 2012

Что касается моего последнего комментария, прикрепленного к коду, опубликованному 21 марта, я обнаружил, что multiprocessing.Pool + SQLite (pysqlite2) непригоден для моей конкретной задачи, поскольку возникли две проблемы:

(1) При использовании соединения по умолчанию, за исключением первого рабочего, все остальные рабочие процессы, которые выполняли запрос вставки, выполнялись только один раз. (2) Когда я изменяю ключевые слова соединения на check_same_thread = False, тогда используется полный пул рабочих, но тогда только некоторые запросы выполняются успешно, а некоторые запросы не выполняются. Когда каждый работник также выполнял time.sleep (0.01), количество сбоев запросов уменьшалось, но не полностью. (3) Менее важно, я отчаянно слышал чтение / запись моего жесткого диска, даже для небольшого списка заданий из 10 запросов на вставку.

Затем я прибег к MySQL-Python, и все получилось намного лучше. Правда, для этого пользователя нужно настроить демон сервера MySQL, пользователя и базу данных, но эти шаги относительно просты.

Вот пример кода, который работал для меня. Очевидно, что его можно оптимизировать, но он дает основную идею для тех, кто ищет, как начать использовать многопроцессорность.

  1 from multiprocessing import Pool, current_process
  2 import MySQLdb
  3 from numpy import random
  4
  5 
  6 if __name__ == "__main__":
  7  
  8   numValues   = 50000
  9   tableName   = "tempTable"
 10   useHostName = ""
 11   useUserName = ""  # Insert your values here.
 12   usePassword = ""
 13   useDBName   = ""
 14   
 15   # Setup database and table for results.
 16   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 17   topCursor = dbConnection.cursor()
 18   # Assuming table does not exist, will be eliminated at the end of the script.
 19   topCursor.execute( 'CREATE TABLE %s (oneText TEXT, oneValue REAL)' % tableName )
 20   topCursor.close() 
 21   dbConnection.close()
 22   
 23   # Define simple function for storing results.
 24   def work( storeValue ):
 25     #print "%s storing value %f" % ( current_process().name, storeValue )
 26     try:
 27       dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 28       cursor = dbConnection.cursor()
 29       cursor.execute( "SET AUTOCOMMIT=1" )
 30       try:
 31         query = "INSERT INTO %s VALUES ('%s',%f)" % ( tableName, current_process().name, storeValue )
 32         #print query
 33         cursor.execute( query )
 34       except:
 35         print "Query failed."
 36       
 37       cursor.close()
 38       dbConnection.close()
 39     except: 
 40       print "Connection/cursor problem."
 41   
 42   
 43   # Create set of values to assign
 44   values = random.random( numValues )
 45   
 46   # Create pool of workers 
 47   pool = Pool( processes=6 )
 48   # Execute assignments.
 49   for value in values: pool.apply_async( func=work, args=(value,) )
 50   pool.close()
 51   pool.join()
 52 
 53   # Cleanup temporary table.
 54   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 55   topCursor = dbConnection.cursor()
 56   topCursor.execute( 'DROP TABLE %s' % tableName )
 57   topCursor.close()
 58   dbConnection.close()
0 голосов
/ 21 марта 2012

Спасибо всем за ваши ответы.

Я создал решение (а не "решение") предложенной проблемы, и, поскольку другие могут найти его полезным, я публикую код здесь.Мое решение - гибрид вариантов 1 и 3, предложенных Адамом Матаном.Код содержит номера строк из моего сеанса vi, что поможет в обсуждении ниже.

 12 # System libraries needed by this module.
 13 import numpy, multiprocessing, time
 14 
 15 # Third-party libraries needed by this module.
 16 import labeledMatrix
 17 
 18 # ----- Begin code for this module. -----
 19 from commonFunctions import debugMessage
 20 
 21 def createSimilarityMatrix( fvFileHandle, fvFileParser, fvSimScorer, colIDs, rowIDs=None,
 22                             exceptionType=ValueError, useNumType=numpy.float, verbose=False,
 23                             maxProcesses=None, processCheckTime=1.0 ):
 24  """Create a labeled similarity matrix from vectorial data in [fvFileHandle] that can be
 25  parsed by [fvFileParser].
 26  [fvSimScorer] should be a function that can return a floating point value for a pair of vectors.
 27 
 28  If the matrix [rowIDs] are not specified, they will be the same as the [colIDs].
 29 
 30  [exceptionType] will be raised when a row or column ID cannot be found in the vectorial data.
 31  [maxProcesses] specifies the number of CPUs to use for calculation; default value is all available CPUs.
 32  [processCheckTime] is the interval for checking activity of CPUs (if completed calculation or not).
 33 
 34  Return: a LabeledNumericMatrix with corresponding row and column IDs."""
 35 
 36  # Setup row/col ID information.
 37  useColIDs = list( colIDs )
 38  useRowIDs = rowIDs or useColIDs
 39  featureData = fvFileParser( fvFileHandle, retainIDs=(useColIDs+useRowIDs) )
 40  verbose and debugMessage( "Retrieved %i feature vectors from FV file." % len(featureData) )
 41  featureIDs = featureData.keys()
 42  absentIDs = [ ID for ID in set(useColIDs + useRowIDs) if ID not in featureIDs ]
 43  if absentIDs: 
 44   raise exceptionType, "IDs %s not found in feature vector file." % absentIDs
 45  # Otherwise, proceed to creation of matrix.
 46  resultMatrix = labeledMatrix.LabeledNumericMatrix( useRowIDs, useColIDs, numType=useNumType )
 47  calculateSymmetric = True if set( useRowIDs ) == set( useColIDs ) else False
 48  
 49  # Setup data structures needed for parallelization.
 50  numSubprocesses = multiprocessing.cpu_count() if maxProcesses==None else int(maxProcesses)
 51  assert numSubprocesses >= 1, "Specification of %i CPUs to calculate similarity matrix." % numSubprocesses
 52  dataManager = multiprocessing.Manager()
 53  sharedFeatureData = dataManager.dict( featureData )
 54  resultQueue = multiprocessing.Queue() 
 55  # Assign jobs evenly through number of processors available.
 56  jobList = [ list() for i in range(numSubprocesses) ]
 57  calculationNumber = 0 # Will hold total number of results stored.
 58  if calculateSymmetric: # Perform calculations with n(n+1)/2 pairs, instead of n^2 pairs.
 59   remainingIDs = list( useRowIDs )
 60   while remainingIDs:
 61    firstID = remainingIDs[0]
 62    for secondID in remainingIDs:
 63     jobList[ calculationNumber % numSubprocesses ].append( (firstID, secondID) )
 64     calculationNumber += 1
 65    remainingIDs.remove( firstID )
 66  else: # Straight processing one at a time.
 67   for rowID in useRowIDs:
 68    for colID in useColIDs:
 69     jobList[ calculationNumber % numSubprocesses ].append( (rowID, colID) )
 70     calculationNumber += 1
 71     
 72  verbose and debugMessage( "Completed setup of job distribution: %s." % [len(js) for js in jobList] )
 73  # Define a function to perform calculation and store results
 74  def runJobs( scoreFunc, pairs, featureData, resultQueue ):
 75   for pair in pairs:
 76    score = scoreFunc( featureData[pair[0]], featureData[pair[1]] )
 77    resultQueue.put( ( pair, score ) )
 78   verbose and debugMessage( "%s: completed all calculations." % multiprocessing.current_process().name )
 79   
 80   
 81  # Create processes to perform parallelized computing.
 82  processes = list()
 83  for num in range(numSubprocesses):
 84   processes.append( multiprocessing.Process( target=runJobs,
 85                                              args=( fvSimScorer, jobList[num], sharedFeatureData, resultQueue ) ) )
 86  # Launch processes and wait for them to all complete.
 87  import Queue # For Queue.Empty exception.
 88  for p in processes:
 89   p.start()
 90  assignmentsCompleted = 0
 91  while assignmentsCompleted < calculationNumber:
 92   numActive = [ p.is_alive() for p in processes ].count( True )
 93   verbose and debugMessage( "%i/%i complete; Active processes: %i" % \
 94               ( assignmentsCompleted, calculationNumber, numActive ) )
 95   while True: # Empty queue immediately to avoid underlying pipe/socket implementation from hanging.
 96    try: 
 97     pair, score = resultQueue.get( block=False )
 98     resultMatrix[ pair[0], pair[1] ] = score
 99     assignmentsCompleted += 1
100     if calculateSymmetric:
101      resultMatrix[ pair[1], pair[0] ] = score
102    except Queue.Empty:
103     break 
104   if numActive == 0: finished = True
105   else:
106    time.sleep( processCheckTime )
107  # Result queue emptied and no active processes remaining - completed calculations.
108  return resultMatrix
109 ## end of createSimilarityMatrix()

Строки 36-47 - это просто предварительный материал, связанный с определением проблемы, которое было частью исходного вопроса.Настройка многопроцессорной обработки для обхода GIL cPython находится в строках 49-56, а строки 57-70 используются для равномерного создания подразделенных задач.Код в строках 57-70 используется вместо itertools.product, потому что, когда список идентификаторов строк / столбцов достигает 40 000 или около того, продукт в итоге занимает огромное количество памяти.

Фактические вычисления, которые должны бытьвыполняется в строках 74-78, и здесь используются общий словарь ID-> векторных записей и общая очередь результатов.

Строки 81-85 устанавливают фактические объекты процесса, хотя они на самом деле не были запущенывсе же.

В моей первой попытке (не показанной здесь), код "try ... resultQueue.get () и назначить кроме ..." фактически находился вне внешнего цикла управления (хотя не всерасчеты завершены).Когда я запускал эту версию кода на модульном тесте матрицы 9x9, проблем не было.Однако, увеличившись до 200x200 или больше, я обнаружил, что этот код завис, несмотря на то, что он ничего не менял в коде между выполнениями.

Согласно этому обсуждению (http://bugs.python.org/issue8426) и официальной документации по многопроцессорности, использованиеmultiprocess.Queue может зависнуть, если базовая реализация не имеет очень большого размера канала / сокета. Поэтому код, приведенный здесь как мое решение, периодически очищает очередь при проверке завершения процессов (см. строки 91-106), так чтодочерние процессы могут продолжать вносить в него новые результаты и избегать переполнения канала.

Когда я тестировал код на больших матрицах 1000x1000, я заметил, что код вычисления закончился намного раньше кода назначения очереди и матрицы.Используя cProfile, я обнаружил, что одним узким местом был интервал опроса по умолчанию processCheckTime = 1.0 (строка 23), и понижение этого значения улучшило скорость результатов (примеры синхронизации см. В нижней части поста). Это может быть полезной информацией для других людей.Новое в многопроцессорности в Python.

В целом, это, вероятно, не лучшая возможная реализация, но она обеспечивает отправную точку для дальнейшей оптимизации.Как часто говорят, оптимизация посредством распараллеливания требует надлежащего анализа и обдумывания.

Примеры синхронизации, все с 8 ЦП.

200x200 (20100 вычислений / назначений)

t = 1,0: время выполнения 18 с

t = 0,01: время выполнения 3 с

500x500 (125250 вычислений / назначений)

t = 1,0: время выполнения 86 с

t = 0,01: время выполнения 23 с

На случай, если кто-то захочет скопировать и вставить код, вот вам юнит-тест, который я использовал для части разработки.Очевидно, что помеченного кода класса матрицы здесь нет, и код считывателя / счетчика отпечатков пальцев не включен (хотя его довольно просто свернуть самостоятельно).Конечно, я тоже рад поделиться этим кодом, если бы кто-то помог.

112 def unitTest():
113  import cStringIO, os
114  from fingerprintReader import MismatchKernelReader
115  from fingerprintScorers import FeatureVectorLinearKernel
116  exampleData = cStringIO.StringIO() # 9 examples from GPCR (3,1)-mismatch descriptors, first 10 columns.
117  exampleData.write( ",AAA,AAC,AAD,AAE,AAF,AAG,AAH,AAI,AAK"  + os.linesep )
118  exampleData.write( "TS1R2_HUMAN,5,2,3,6,8,6,6,7,4" + os.linesep )
119  exampleData.write( "SSR1_HUMAN,11,6,5,7,4,7,4,7,9" + os.linesep )
120  exampleData.write( "OXYR_HUMAN,27,13,14,14,15,14,11,16,14" + os.linesep )
121  exampleData.write( "ADA1A_HUMAN,7,3,5,4,5,7,3,8,4" + os.linesep )
122  exampleData.write( "TA2R_HUMAN,16,6,7,8,9,10,6,6,6" + os.linesep )
123  exampleData.write( "OXER1_HUMAN,10,6,5,7,11,9,5,10,6" + os.linesep )
124  exampleData.write( "NPY1R_HUMAN,3,3,0,2,3,1,0,6,2" + os.linesep )
125  exampleData.write( "NPSR1_HUMAN,0,1,1,0,3,0,0,6,2" + os.linesep )
126  exampleData.write( "HRH3_HUMAN,16,9,9,13,14,14,9,11,9" + os.linesep )
127  exampleData.write( "HCAR2_HUMAN,3,1,3,2,5,1,1,6,2" )
128  columnIDs = ( "TS1R2_HUMAN", "SSR1_HUMAN", "OXYR_HUMAN", "ADA1A_HUMAN", "TA2R_HUMAN", "OXER1_HUMAN",
129                "NPY1R_HUMAN", "NPSR1_HUMAN", "HRH3_HUMAN", "HCAR2_HUMAN", )
130  m = createSimilarityMatrix( exampleData, MismatchKernelReader, FeatureVectorLinearKernel, columnIDs,
131                              verbose=True, )
132  m.SetOutputPrecision( 6 )
133  print m
134 
135 ## end of unitTest()
...