Почему кривая анализа моего теста на перестановку не является гладкой? - PullRequest
5 голосов
/ 21 марта 2019

Я использую тест перестановки (извлечение случайных подвыборок), чтобы проверить разницу между двумя экспериментами. Каждый эксперимент проводился 100 раз (= 100 реплик каждого). Каждая реплика состоит из 801 точек измерения с течением времени. Теперь я хотел бы выполнить своего рода перестановку (или загрузку), чтобы проверить, сколько реплик за эксперимент (и сколько (временных) точек измерения) мне нужно, чтобы получить определенный уровень надежности.

Для этой цели я написал код, из которого я извлек минимальный рабочий пример (со множеством жестко запрограммированных вещей) (см. Ниже). Входные данные генерируются как случайные числа. Здесь np.random.rand (100, 801) для 100 реплик и 801 временных точек.

Этот код работает в принципе, однако полученные кривые иногда не плавно падают, как можно было бы ожидать при выборе случайных подвыборок 5000 раз. Вот вывод кода ниже:

Result of the code below

Видно, что на 2 из оси X есть пик, которого там быть не должно. Если я изменю случайное начальное число с 52389 на 324235, оно исчезнет, ​​и кривая станет гладкой. Кажется, что-то не так с выбором случайных чисел?

Почему это так? У меня есть семантически подобный код в Matlab, и там кривые абсолютно гладкие уже при 1000 перестановках (здесь 5000).

Есть ли у меня ошибка кодирования или генератор случайных чисел не работает?

Кто-нибудь видит проблему здесь?

import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import current_process, cpu_count, Process, Queue
import matplotlib.pylab as pl


def groupDiffsInParallel (queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter):

    allResults = np.zeros([nrOfReplicas, nrOfPermuts])  # e.g. 100 x 3000
    for repsPerGroupIdx in range(1, nrOfReplicas + 1):
        for permutIdx in range(nrOfPermuts):
            d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
            d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d1Sel = d1TimeCut[d1Idxs, :]
            d1Mean = np.mean(d1Sel.flatten())

            d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
            d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d2Sel = d2TimeCut[d2Idxs, :]
            d2Mean = np.mean(d2Sel.flatten())

            diff = d1Mean - d2Mean

            allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)

    queue.put(allResults)



def evalDifferences_parallel (d1, d2):
    # d1 and d2 are of size reps x time (e.g. 100x801)

    nrOfReplicas = d1.shape[0]
    nrOfFrames = d1.shape[1]
    timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]  # 17
    nrOfTimesOfInterest = len(timesOfInterestNs)
    framesPerNs = (nrOfFrames-1)/100  # sim time == 100 ns
    timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]

    nrOfPermuts = 5000

    allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas, nrOfPermuts]) # e.g. 17 x 100 x 3000
    nrOfProcesses = cpu_count()
    print('{} cores available'.format(nrOfProcesses))
    queue = Queue()
    jobs = []
    print('Starting ...')

    # use one process for each time cut
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
        p = Process(target=groupDiffsInParallel, args=(queue, d1, d2, nrOfReplicas, nrOfPermuts, timesOfInterestFramesIter))
        p.start()
        jobs.append(p)
        print('Process {} started work on time \"{} ns\"'.format(timesOfInterestFramesIterIdx, timesOfInterestNs[timesOfInterestFramesIterIdx]), end='\n', flush=True)
    # collect the results
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter in enumerate(timesOfInterestFrames):
        oneResult = queue.get()
        allResults[timesOfInterestFramesIterIdx, :, :] = oneResult
        print('Process number {} returned the results.'.format(timesOfInterestFramesIterIdx), end='\n', flush=True)
    # hold main thread and wait for the child process to complete. then join back the resources in the main thread
    for proc in jobs:
        proc.join()
    print("All parallel done.")

    allResultsMeanOverPermuts = allResults.mean(axis=2)  # size: 17 x 100

    replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100])
    replicaNumbersToPlot -= 1  # zero index!
    colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))

    ctr = 0

    f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
    axId = (1, 0)
    for lineIdx in replicaNumbersToPlot:
        lineData = allResultsMeanOverPermuts[:, lineIdx]
        ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5, label="nReps="+str(lineIdx+1))
        ctr+=1

    ax[axId].set_xticks(range(nrOfTimesOfInterest))  # careful: this is not the same as plt.xticks!!
    ax[axId].set_xticklabels(timesOfInterestNs)
    ax[axId].set_xlabel("simulation length taken into account")
    ax[axId].set_ylabel("average difference between mean values boot strapping samples")
    ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1]+1])  # increase x max by 2

    plt.show()


##### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)

------------- ОБНОВЛЕНИЕ ---------------

Изменение генератора случайных чисел с numpy на «от случайного импорта import randint» не решает проблему:

от

d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)

до:

d1Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]
d2Idxs = [randint(0, nrOfReplicas-1) for p in range(repsPerGroupIdx)]

--- ОБНОВЛЕНИЕ 2 ---

timesOfInterestNs можно просто установить на:

timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50]

для ускорения работы на машинах с меньшим количеством ядер.

--- ОБНОВЛЕНИЕ 3 ---

Повторная инициализация генератора случайных начальных чисел в каждом дочернем процессе ( Случайное начальное число - репликация на дочерние процессы ) также не решает проблему:

pid = str(current_process())
pid = int(re.split("(\W)", pid)[6])
ms = int(round(time.time() * 1000))
mySeed = np.mod(ms, 4294967295)
mySeed = mySeed + 25000 * pid + 100 * pid + pid
mySeed = np.mod(mySeed, 4294967295)
np.random.seed(seed=mySeed)

--- ОБНОВЛЕНИЕ 4 --- На машине с Windows вам понадобится:

if __name__ == '__main__':

, чтобы избежать рекурсивного создания подпроцессов (и сбоя).

Ответы [ 2 ]

6 голосов
/ 02 мая 2019

Полагаю, это классическая многопроцессорная ошибка.Ничто не гарантирует, что процессы будут завершены в том же порядке, в котором они были запущены.Это означает, что вы не можете быть уверены, что инструкция allResults[timesOfInterestFramesIterIdx, :, :] = oneResult сохранит результат процесса 'timesOfInterestFramesIterIdx' в местоположении 'timesOfInterestFramesIterIdx' в allResults.Чтобы было понятнее, допустим, что 'timesOfInterestFramesIterIdx' равен 2, тогда у вас нет абсолютно никаких гарантий, что oneResult является результатом процесса 2.

Я реализовал очень быстрое исправление ниже.Идея состоит в том, чтобы отслеживать порядок запуска процессов, добавив дополнительный аргумент в groupDiffsInParallel, который затем сохраняется в очереди и, таким образом, служит идентификатором процесса при получении результатов.

import matplotlib.pyplot as plt
import numpy as np
from multiprocessing import cpu_count, Process, Queue
import matplotlib.pylab as pl


def groupDiffsInParallel(queue, d1, d2, nrOfReplicas, nrOfPermuts,
                         timesOfInterestFramesIter,
                         timesOfInterestFramesIterIdx):

    allResults = np.zeros([nrOfReplicas, nrOfPermuts])  # e.g. 100 x 3000
    for repsPerGroupIdx in range(1, nrOfReplicas + 1):
        for permutIdx in range(nrOfPermuts):
            d1TimeCut = d1[:, 0:int(timesOfInterestFramesIter)]
            d1Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d1Sel = d1TimeCut[d1Idxs, :]
            d1Mean = np.mean(d1Sel.flatten())

            d2TimeCut = d2[:, 0:int(timesOfInterestFramesIter)]
            d2Idxs = np.random.randint(0, nrOfReplicas, size=repsPerGroupIdx)
            d2Sel = d2TimeCut[d2Idxs, :]
            d2Mean = np.mean(d2Sel.flatten())

            diff = d1Mean - d2Mean

            allResults[repsPerGroupIdx - 1, permutIdx] = np.abs(diff)

    queue.put({'allResults': allResults,
               'number': timesOfInterestFramesIterIdx})


def evalDifferences_parallel (d1, d2):
    # d1 and d2 are of size reps x time (e.g. 100x801)

    nrOfReplicas = d1.shape[0]
    nrOfFrames = d1.shape[1]
    timesOfInterestNs = [0.25, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 60, 70,
                         80, 90, 100]  # 17
    nrOfTimesOfInterest = len(timesOfInterestNs)
    framesPerNs = (nrOfFrames-1)/100  # sim time == 100 ns
    timesOfInterestFrames = [x*framesPerNs for x in timesOfInterestNs]

    nrOfPermuts = 5000

    allResults = np.zeros([nrOfTimesOfInterest, nrOfReplicas,
                           nrOfPermuts])  # e.g. 17 x 100 x 3000
    nrOfProcesses = cpu_count()
    print('{} cores available'.format(nrOfProcesses))
    queue = Queue()
    jobs = []
    print('Starting ...')

    # use one process for each time cut
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter \
            in enumerate(timesOfInterestFrames):
        p = Process(target=groupDiffsInParallel,
                    args=(queue, d1, d2, nrOfReplicas, nrOfPermuts,
                          timesOfInterestFramesIter,
                          timesOfInterestFramesIterIdx))
        p.start()
        jobs.append(p)
        print('Process {} started work on time \"{} ns\"'.format(
            timesOfInterestFramesIterIdx,
            timesOfInterestNs[timesOfInterestFramesIterIdx]),
              end='\n', flush=True)
    # collect the results
    resultdict = {}
    for timesOfInterestFramesIterIdx, timesOfInterestFramesIter \
            in enumerate(timesOfInterestFrames):
        resultdict.update(queue.get())
        allResults[resultdict['number'], :, :] = resultdict['allResults']
        print('Process number {} returned the results.'.format(
            resultdict['number']), end='\n', flush=True)
    # hold main thread and wait for the child process to complete. then join
    # back the resources in the main thread
    for proc in jobs:
        proc.join()
    print("All parallel done.")

    allResultsMeanOverPermuts = allResults.mean(axis=2)  # size: 17 x 100

    replicaNumbersToPlot = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40,
                                     50, 60, 70, 80, 90, 100])
    replicaNumbersToPlot -= 1  # zero index!
    colors = pl.cm.jet(np.linspace(0, 1, len(replicaNumbersToPlot)))

    ctr = 0

    f, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
    axId = (1, 0)
    for lineIdx in replicaNumbersToPlot:
        lineData = allResultsMeanOverPermuts[:, lineIdx]
        ax[axId].plot(lineData, ".-", color=colors[ctr], linewidth=0.5,
                      label="nReps="+str(lineIdx+1))
        ctr += 1

    ax[axId].set_xticks(range(nrOfTimesOfInterest))
    # careful: this is not the same as plt.xticks!!
    ax[axId].set_xticklabels(timesOfInterestNs)
    ax[axId].set_xlabel("simulation length taken into account")
    ax[axId].set_ylabel("average difference between mean values boot "
                        + "strapping samples")
    ax[axId].set_xlim([ax[axId].get_xlim()[0], ax[axId].get_xlim()[1]+1])
    # increase x max by 2

    plt.show()


# #### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2)

Это вывод, который я получаю, который, очевидно, показывает, что порядок, в котором возвращаются процессы, перемешивается по сравнению с порядком запуска.

20 cores available
Starting ...
Process 0 started work on time "0.25 ns"
Process 1 started work on time "0.5 ns"
Process 2 started work on time "1 ns"
Process 3 started work on time "2 ns"
Process 4 started work on time "3 ns"
Process 5 started work on time "4 ns"
Process 6 started work on time "5 ns"
Process 7 started work on time "10 ns"
Process 8 started work on time "20 ns"
Process 9 started work on time "30 ns"
Process 10 started work on time "40 ns"
Process 11 started work on time "50 ns"
Process 12 started work on time "60 ns"
Process 13 started work on time "70 ns"
Process 14 started work on time "80 ns"
Process 15 started work on time "90 ns"
Process 16 started work on time "100 ns"
Process number 3 returned the results.
Process number 0 returned the results.
Process number 4 returned the results.
Process number 7 returned the results.
Process number 1 returned the results.
Process number 2 returned the results.
Process number 5 returned the results.
Process number 8 returned the results.
Process number 6 returned the results.
Process number 9 returned the results.
Process number 10 returned the results.
Process number 11 returned the results.
Process number 12 returned the results.
Process number 13 returned the results.
Process number 14 returned the results.
Process number 15 returned the results.
Process number 16 returned the results.
All parallel done.

И полученная цифра.

Correct output

4 голосов
/ 30 апреля 2019

не уверен, что вы все еще зациклены на этой проблеме, но я просто запустил ваш код на своем компьютере (MacBook Pro (15 дюймов, 2018)) в Jupyter 4.4.0, и мои графики сглажены с тем же начальным числомзначения, которые вы изначально разместили:

##### MAIN ####
np.random.seed(83737)  # some number for reproducibility
d1 = np.random.rand(100, 801)
d2 = np.random.rand(100, 801)

np.random.seed(52389)  # if changed to 324235 the peak is gone
evalDifferences_parallel(d1, d2) 

Plot

Возможно, в вашем коде нет ничего плохого и ничего особенного в семени 324235, и вам просто нужно проверить дваждыверсии вашего модуля, так как любые изменения в исходном коде, сделанные в более поздних версиях, могут повлиять на ваши результаты.Для справки я использую numpy 1.15.4, matplotlib 3.0.2 и multiprocessing 2.6.2.1.

...