Я использую тест перестановки (извлечение случайных подвыборок), чтобы проверить разницу между двумя экспериментами. Каждый эксперимент проводился 100 раз (= 100 реплик каждого). Каждая реплика состоит из 801 точек измерения с течением времени. Теперь я хотел бы выполнить своего рода перестановку (или загрузку), чтобы проверить, сколько реплик за эксперимент (и сколько (временных) точек измерения) мне нужно, чтобы получить определенный уровень надежности.
Для этой цели я написал код, из которого я извлек минимальный рабочий пример (со множеством жестко запрограммированных вещей) (см. Ниже). Входные данные генерируются как случайные числа. Здесь np.random.rand (100, 801) для 100 реплик и 801 временных точек.
Этот код работает в принципе, однако полученные кривые иногда не плавно падают, как можно было бы ожидать при выборе случайных подвыборок 5000 раз. Вот вывод кода ниже:
Видно, что на 2 из оси X есть пик, которого там быть не должно. Если я изменю случайное начальное число с 52389 на 324235, оно исчезнет, и кривая станет гладкой. Кажется, что-то не так с выбором случайных чисел?
Почему это так? У меня есть семантически подобный код в Matlab, и там кривые абсолютно гладкие уже при 1000 перестановках (здесь 5000).
Есть ли у меня ошибка кодирования или генератор случайных чисел не работает?
Кто-нибудь видит проблему здесь?
import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import current_process, cpu_count, Process, Queue
import matplotlib.pylab as pl
def groupDiffsInParallel (queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter):
allResults = np.zeros([nrOfReplicas, nrOfPermuts]) # e.g. 100 x 3000
for repsPerGroupIdx in range(1, nrOfReplicas + 1):
for permutIdx in range(nrOfPermuts):
d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
d1Sel = d1TimeCut[d1Idxs, :]
d1Mean = np.mean(d1Sel.flatten())
d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
d2Sel = d2TimeCut[d2Idxs, :]
d2Mean = np.mean(d2Sel.flatten())
diff = d1Mean - d2Mean
allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)
queue.put(allResults)
def evalDifferences_parallel (d1, d2):
# d1 and d2 are of size reps x time (e.g. 100x801)
nrOfReplicas = d1.shape[0]
nrOfFrames = d1.shape[1]
timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100] # 17
nrOfTimesOfInterest = len(timesOfInterestNs)
framesPerNs = (nrOfFrames-1)/100 # sim time == 100 ns
timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]
nrOfPermuts = 5000
allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas, nrOfPermuts]) # e.g. 17 x 100 x 3000
nrOfProcesses = cpu_count()
print('{} cores available'.format(nrOfProcesses))
queue = Queue()
jobs = []
print('Starting ...')
# use one process for each time cut
for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
p = Process(target=groupDiffsInParallel, args=(queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter))
p.start()
jobs.append(p)
print('Process {} started work on time \"{} ns\"'.format(timesOfInterestFramesIterIdx, timesOfInterestNs[timesOfInterestFramesIterIdx]), end='\n', flush=True)
# collect the results
for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
oneResult = queue.get()
allResults[timesOfInterestFramesIterIdx, :, :] = oneResult
print('Process number {} returned the results.'.format(timesOfInterestFramesIterIdx), end='\n', flush=True)
# hold main thread and wait for the child process to complete. then join back the resources in the main thread
for proc in jobs:
proc.join()
print("All parallel done.")
allResultsMeanOverPermuts = allResults.mean(axis=2) # size: 17 x 100
replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100])
replicaNumbersToPlot -= 1 # zero index!
colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))
ctr = 0
f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
axId = (1, 0)
for lineIdx in replicaNumbersToPlot:
lineData = allResultsMeanOverPermuts[:, lineIdx]
ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5, label="nReps="+str(lineIdx+1))
ctr+=1
ax[axId].set_xticks(range(nrOfTimesOfInterest)) # careful: this is not the same as plt.xticks!!
ax[axId].set_xticklabels(timesOfInterestNs)
ax[axId].set_xlabel("simulation length taken into account")
ax[axId].set_ylabel("average difference between mean values boot strapping samples")
ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1]+1]) # increase x max by 2
plt.show()
##### MAIN ####
np.random.seed(83737) # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)
np.random.seed(52389) # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)
------------- ОБНОВЛЕНИЕ ---------------
Изменение генератора случайных чисел с numpy на «от случайного импорта import randint» не решает проблему:
от
d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
до:
d1Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]
d2Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]
--- ОБНОВЛЕНИЕ 2 ---
timesOfInterestNs
можно просто установить на:
timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50]
для ускорения работы на машинах с меньшим количеством ядер.
--- ОБНОВЛЕНИЕ 3 ---
Повторная инициализация генератора случайных начальных чисел в каждом дочернем процессе ( Случайное начальное число - репликация на дочерние процессы ) также не решает проблему:
pid = str(current_process())
pid = int(re.split("(\W)", pid)[6])
ms = int(round(time.time() * 1000))
mySeed = np.mod(ms, 4294967295)
mySeed = mySeed + 25000 * pid + 100 * pid + pid
mySeed = np.mod(mySeed, 4294967295)
np.random.seed(seed=mySeed)
--- ОБНОВЛЕНИЕ 4 ---
На машине с Windows вам понадобится:
if __name__ == '__main__':
, чтобы избежать рекурсивного создания подпроцессов (и сбоя).