Спасибо всем за ваши ответы.
Я создал решение (а не "решение") предложенной проблемы, и, поскольку другие могут найти его полезным, я публикую код здесь.Мое решение - гибрид вариантов 1 и 3, предложенных Адамом Матаном.Код содержит номера строк из моего сеанса vi, что поможет в обсуждении ниже.
12 # System libraries needed by this module.
13 import numpy, multiprocessing, time
14
15 # Third-party libraries needed by this module.
16 import labeledMatrix
17
18 # ----- Begin code for this module. -----
19 from commonFunctions import debugMessage
20
21 def createSimilarityMatrix( fvFileHandle, fvFileParser, fvSimScorer, colIDs, rowIDs=None,
22 exceptionType=ValueError, useNumType=numpy.float, verbose=False,
23 maxProcesses=None, processCheckTime=1.0 ):
24 """Create a labeled similarity matrix from vectorial data in [fvFileHandle] that can be
25 parsed by [fvFileParser].
26 [fvSimScorer] should be a function that can return a floating point value for a pair of vectors.
27
28 If the matrix [rowIDs] are not specified, they will be the same as the [colIDs].
29
30 [exceptionType] will be raised when a row or column ID cannot be found in the vectorial data.
31 [maxProcesses] specifies the number of CPUs to use for calculation; default value is all available CPUs.
32 [processCheckTime] is the interval for checking activity of CPUs (if completed calculation or not).
33
34 Return: a LabeledNumericMatrix with corresponding row and column IDs."""
35
36 # Setup row/col ID information.
37 useColIDs = list( colIDs )
38 useRowIDs = rowIDs or useColIDs
39 featureData = fvFileParser( fvFileHandle, retainIDs=(useColIDs+useRowIDs) )
40 verbose and debugMessage( "Retrieved %i feature vectors from FV file." % len(featureData) )
41 featureIDs = featureData.keys()
42 absentIDs = [ ID for ID in set(useColIDs + useRowIDs) if ID not in featureIDs ]
43 if absentIDs:
44 raise exceptionType, "IDs %s not found in feature vector file." % absentIDs
45 # Otherwise, proceed to creation of matrix.
46 resultMatrix = labeledMatrix.LabeledNumericMatrix( useRowIDs, useColIDs, numType=useNumType )
47 calculateSymmetric = True if set( useRowIDs ) == set( useColIDs ) else False
48
49 # Setup data structures needed for parallelization.
50 numSubprocesses = multiprocessing.cpu_count() if maxProcesses==None else int(maxProcesses)
51 assert numSubprocesses >= 1, "Specification of %i CPUs to calculate similarity matrix." % numSubprocesses
52 dataManager = multiprocessing.Manager()
53 sharedFeatureData = dataManager.dict( featureData )
54 resultQueue = multiprocessing.Queue()
55 # Assign jobs evenly through number of processors available.
56 jobList = [ list() for i in range(numSubprocesses) ]
57 calculationNumber = 0 # Will hold total number of results stored.
58 if calculateSymmetric: # Perform calculations with n(n+1)/2 pairs, instead of n^2 pairs.
59 remainingIDs = list( useRowIDs )
60 while remainingIDs:
61 firstID = remainingIDs[0]
62 for secondID in remainingIDs:
63 jobList[ calculationNumber % numSubprocesses ].append( (firstID, secondID) )
64 calculationNumber += 1
65 remainingIDs.remove( firstID )
66 else: # Straight processing one at a time.
67 for rowID in useRowIDs:
68 for colID in useColIDs:
69 jobList[ calculationNumber % numSubprocesses ].append( (rowID, colID) )
70 calculationNumber += 1
71
72 verbose and debugMessage( "Completed setup of job distribution: %s." % [len(js) for js in jobList] )
73 # Define a function to perform calculation and store results
74 def runJobs( scoreFunc, pairs, featureData, resultQueue ):
75 for pair in pairs:
76 score = scoreFunc( featureData[pair[0]], featureData[pair[1]] )
77 resultQueue.put( ( pair, score ) )
78 verbose and debugMessage( "%s: completed all calculations." % multiprocessing.current_process().name )
79
80
81 # Create processes to perform parallelized computing.
82 processes = list()
83 for num in range(numSubprocesses):
84 processes.append( multiprocessing.Process( target=runJobs,
85 args=( fvSimScorer, jobList[num], sharedFeatureData, resultQueue ) ) )
86 # Launch processes and wait for them to all complete.
87 import Queue # For Queue.Empty exception.
88 for p in processes:
89 p.start()
90 assignmentsCompleted = 0
91 while assignmentsCompleted < calculationNumber:
92 numActive = [ p.is_alive() for p in processes ].count( True )
93 verbose and debugMessage( "%i/%i complete; Active processes: %i" % \
94 ( assignmentsCompleted, calculationNumber, numActive ) )
95 while True: # Empty queue immediately to avoid underlying pipe/socket implementation from hanging.
96 try:
97 pair, score = resultQueue.get( block=False )
98 resultMatrix[ pair[0], pair[1] ] = score
99 assignmentsCompleted += 1
100 if calculateSymmetric:
101 resultMatrix[ pair[1], pair[0] ] = score
102 except Queue.Empty:
103 break
104 if numActive == 0: finished = True
105 else:
106 time.sleep( processCheckTime )
107 # Result queue emptied and no active processes remaining - completed calculations.
108 return resultMatrix
109 ## end of createSimilarityMatrix()
Строки 36-47 - это просто предварительный материал, связанный с определением проблемы, которое было частью исходного вопроса.Настройка многопроцессорной обработки для обхода GIL cPython находится в строках 49-56, а строки 57-70 используются для равномерного создания подразделенных задач.Код в строках 57-70 используется вместо itertools.product, потому что, когда список идентификаторов строк / столбцов достигает 40 000 или около того, продукт в итоге занимает огромное количество памяти.
Фактические вычисления, которые должны бытьвыполняется в строках 74-78, и здесь используются общий словарь ID-> векторных записей и общая очередь результатов.
Строки 81-85 устанавливают фактические объекты процесса, хотя они на самом деле не были запущенывсе же.
В моей первой попытке (не показанной здесь), код "try ... resultQueue.get () и назначить кроме ..." фактически находился вне внешнего цикла управления (хотя не всерасчеты завершены).Когда я запускал эту версию кода на модульном тесте матрицы 9x9, проблем не было.Однако, увеличившись до 200x200 или больше, я обнаружил, что этот код завис, несмотря на то, что он ничего не менял в коде между выполнениями.
Согласно этому обсуждению (http://bugs.python.org/issue8426) и официальной документации по многопроцессорности, использованиеmultiprocess.Queue может зависнуть, если базовая реализация не имеет очень большого размера канала / сокета. Поэтому код, приведенный здесь как мое решение, периодически очищает очередь при проверке завершения процессов (см. строки 91-106), так чтодочерние процессы могут продолжать вносить в него новые результаты и избегать переполнения канала.
Когда я тестировал код на больших матрицах 1000x1000, я заметил, что код вычисления закончился намного раньше кода назначения очереди и матрицы.Используя cProfile, я обнаружил, что одним узким местом был интервал опроса по умолчанию processCheckTime = 1.0 (строка 23), и понижение этого значения улучшило скорость результатов (примеры синхронизации см. В нижней части поста). Это может быть полезной информацией для других людей.Новое в многопроцессорности в Python.
В целом, это, вероятно, не лучшая возможная реализация, но она обеспечивает отправную точку для дальнейшей оптимизации.Как часто говорят, оптимизация посредством распараллеливания требует надлежащего анализа и обдумывания.
Примеры синхронизации, все с 8 ЦП.
200x200 (20100 вычислений / назначений)
t = 1,0: время выполнения 18 с
t = 0,01: время выполнения 3 с
500x500 (125250 вычислений / назначений)
t = 1,0: время выполнения 86 с
t = 0,01: время выполнения 23 с
На случай, если кто-то захочет скопировать и вставить код, вот вам юнит-тест, который я использовал для части разработки.Очевидно, что помеченного кода класса матрицы здесь нет, и код считывателя / счетчика отпечатков пальцев не включен (хотя его довольно просто свернуть самостоятельно).Конечно, я тоже рад поделиться этим кодом, если бы кто-то помог.
112 def unitTest():
113 import cStringIO, os
114 from fingerprintReader import MismatchKernelReader
115 from fingerprintScorers import FeatureVectorLinearKernel
116 exampleData = cStringIO.StringIO() # 9 examples from GPCR (3,1)-mismatch descriptors, first 10 columns.
117 exampleData.write( ",AAA,AAC,AAD,AAE,AAF,AAG,AAH,AAI,AAK" + os.linesep )
118 exampleData.write( "TS1R2_HUMAN,5,2,3,6,8,6,6,7,4" + os.linesep )
119 exampleData.write( "SSR1_HUMAN,11,6,5,7,4,7,4,7,9" + os.linesep )
120 exampleData.write( "OXYR_HUMAN,27,13,14,14,15,14,11,16,14" + os.linesep )
121 exampleData.write( "ADA1A_HUMAN,7,3,5,4,5,7,3,8,4" + os.linesep )
122 exampleData.write( "TA2R_HUMAN,16,6,7,8,9,10,6,6,6" + os.linesep )
123 exampleData.write( "OXER1_HUMAN,10,6,5,7,11,9,5,10,6" + os.linesep )
124 exampleData.write( "NPY1R_HUMAN,3,3,0,2,3,1,0,6,2" + os.linesep )
125 exampleData.write( "NPSR1_HUMAN,0,1,1,0,3,0,0,6,2" + os.linesep )
126 exampleData.write( "HRH3_HUMAN,16,9,9,13,14,14,9,11,9" + os.linesep )
127 exampleData.write( "HCAR2_HUMAN,3,1,3,2,5,1,1,6,2" )
128 columnIDs = ( "TS1R2_HUMAN", "SSR1_HUMAN", "OXYR_HUMAN", "ADA1A_HUMAN", "TA2R_HUMAN", "OXER1_HUMAN",
129 "NPY1R_HUMAN", "NPSR1_HUMAN", "HRH3_HUMAN", "HCAR2_HUMAN", )
130 m = createSimilarityMatrix( exampleData, MismatchKernelReader, FeatureVectorLinearKernel, columnIDs,
131 verbose=True, )
132 m.SetOutputPrecision( 6 )
133 print m
134
135 ## end of unitTest()