Как улучшить производительность фильтрации совпадений временного ряда? - PullRequest
1 голос
/ 11 июня 2019

Я работаю над нестационарными экспериментальными данными по гидродинамике.Мы измерили данные по трем каналам, поэтому выборки не совпадают напрямую (измеряются одновременно).Я хочу отфильтровать их по схеме окна, чтобы получить совпадающие образцы и игнорировать все остальные.

К сожалению, я не могу загрузить исходный набор данных из-за ограничений компании.Но я попытался создать минимальный пример, который генерирует похожий (меньший) набор данных.Исходный набор данных состоит из 500000 значений на канал, каждое из которых отмечено временем прибытия.Совпадение проверяется с помощью этих отметок времени.

Только сейчас я зацикливаюсь на каждом сэмпле с первого канала и смотрю на разницу во времени для других каналов.Если он меньше указанной ширины окна, индекс сохраняется.Вероятно, было бы немного быстрее, если бы я указал интервал, в котором нужно проверять различия (например, 100 или 1000 выборок по соседству).Но данные между каналами могут существенно различаться, поэтому они еще не реализованы.Я предпочитаю избавляться от зацикливания каждого семпла - если это возможно.

def filterCoincidence(df, window = 50e-6):
        '''
        Filters the dataset with arbitrary different data rates on different channels to coincident samples.
        The coincidence is checked with regard to a time window specified as argument.
        '''
        AT_cols = [col for col in df.columns if 'AT' in col]
        if len(AT_cols) == 1:
            print('only one group available')
            return
        used_ix = np.zeros( (df.shape[0], len(AT_cols)))
        used_ix.fill(np.nan)
        for ix, sample in enumerate(df[AT_cols[0]]):
            used_ix[ix, 0] = ix
            test_ix = np.zeros(2)
            for ii, AT_col in enumerate(AT_cols[1:]):
                diff = np.abs(df[AT_col] - sample)
                index = diff[diff <= window].sort_values().index.values
                if len(index) == 0:
                    test_ix[ii] = None
                    continue
                test_ix[ii] = [ix_use if (ix_use not in used_ix[:, ii+1] or ix == 0) else None for ix_use in index][0]
            if not np.any(np.isnan(test_ix)):
                used_ix[ix, 1:] = test_ix
            else:
                used_ix[ix, 1:] = [None, None]
        used_ix = used_ix[~np.isnan(used_ix).any(axis=1)]
        print(used_ix.shape)
        return

no_points = 10000
no_groups = 3
meas_duration = 60
df = pd.DataFrame(np.transpose([np.sort(np.random.rand(no_points)*meas_duration) for _ in range(no_groups)]), columns=['AT {}'.format(i) for i in range(no_groups)])
filterCoincidence(df, window=1e-3)

Есть ли уже реализованный модуль, который может выполнять такую ​​фильтрацию?Однако было бы замечательно, если бы вы могли дать мне несколько советов по повышению производительности кода.

1 Ответ

0 голосов
/ 08 июля 2019

Просто чтобы обновить эту ветку, если у кого-то еще есть подобная проблема. Я думаю, что после нескольких пересмотров кода я нашел правильное решение для этого.

def filterCoincidence(self, AT1, AT2, AT3, window = 0.05e-3):
    '''
    Filters the dataset with arbitrary different data rates on different channels to coincident samples.
    The coincidence is checked with regard to a time window specified as argument.

    - arguments:
        - three times series AT1, AT2 and AT3 (arrival times of particles in my case)
        - window size (50 microseconds as default setting)

    - output: indices of combined samples
    '''

    start_time = datetime.datetime.now()
    AT_list = [AT1, AT2, AT3]

    # take the shortest period of time
    min_EndArrival = np.max(AT_list)
    max_BeginArrival = np.min(AT_list)
    for i, col in enumerate(AT_list):
        min_EndArrival = min(min_EndArrival, np.max(col))
        max_BeginArrival = max(max_BeginArrival, np.min(col))
    for i, col in enumerate(AT_list):
        AT_list[i] = np.delete(AT_list[i], np.where((col < max_BeginArrival - window) | (col > min_EndArrival + window)))

    # get channel with lowest datarate
    num_points = np.zeros(len(AT_list))
    datarate = np.zeros(len(AT_list))

    for i, AT in enumerate(AT_list):
        num_points[i] = AT.shape[0]
        datarate[i] = num_points[i] / (AT[-1]-AT[0])
    used_ref = np.argmin(datarate)

    # process coincidence
    AT_ref_val = AT_list[used_ref]
    AT_list = list(np.delete(AT_list, used_ref))
    overview = np.zeros( (AT_ref_val.shape[0], 3), dtype=int)
    overview[:,0] = np.arange(AT_ref_val.shape[0], dtype=int)
    borders = np.empty(2, dtype=object)
    max_diff = np.zeros(2, dtype=int)
    for i, AT in enumerate(AT_list):
        neighbors_lower = np.searchsorted(AT, AT_ref_val - window, side='left')
        neighbors_upper = np.searchsorted(AT, AT_ref_val + window, side='left')
        borders[i] = np.transpose([neighbors_lower, neighbors_upper])
        coinc_ix = np.where(np.diff(borders[i], axis=1).flatten() != 0)[0]
        max_diff[i] = np.max(np.diff(borders[i], axis=1))
        overview[coinc_ix, i+1] = 1
    use_ix = np.where(~np.any(overview==0, axis=1))
    borders[0] = borders[0][use_ix]
    borders[1] = borders[1][use_ix]
    overview = overview[use_ix]

    # create all possible combinations refer to the reference
    combinations = np.prod(max_diff)
    test = np.empty((overview.shape[0]*combinations, 3), dtype=object)
    for i, [ref_ix, at1, at2] in enumerate(zip(overview[:, 0], borders[0], borders[1])):
        test[i * combinations:i * combinations + combinations, 0] = ref_ix
        at1 = np.arange(at1[0], at1[1])
        at2 = np.arange(at2[0], at2[1])
        test[i*combinations:i*combinations+at1.shape[0]*at2.shape[0],1:] = np.asarray(list(itertools.product(at1, at2)))
    test = test[~np.any(pd.isnull(test), axis=1)]

    # check distances
    ix_ref = test[:,0]
    test = test[:,1:]
    test = np.insert(test, used_ref, ix_ref, axis=1)
    test = test.astype(int)

    AT_list.insert(used_ref, AT_ref_val)
    AT_mat = np.zeros(test.shape)
    for i, AT in enumerate(AT_list):
        AT_mat[:,i] = AT[test[:,i]]

    distances = np.zeros( (test.shape[0], len(list(itertools.combinations(range(3), 2)))))
    for i, AT in enumerate(itertools.combinations(range(3), 2)):
        distances[:,i] = np.abs(AT_mat[:,AT[0]]-AT_mat[:,AT[1]])
    ix = np.where(np.all(distances <= window, axis=1))[0]
    test = test[ix,:]
    distances = distances[ix,:]

    # check duplicates
    # use sum of differences as similarity factor
    dist_sum = np.max(distances, axis=1)
    unique_sorted = np.argsort([np.unique(test[:,i]).shape[0] for i in range(test.shape[1])])[::-1]
    test = np.hstack([test, dist_sum.reshape(-1, 1)])
    test = test[test[:,-1].argsort()]
    for j in unique_sorted:
        _, ix = np.unique(test[:,j], return_index=True)
        test = test[ix, :]
    test = test[:,:3]
    test = test[test[:,used_ref].argsort()]
    # check that all values are after each other
    ix = np.where(np.any(np.diff(test, axis=0) > 0, axis=1))[0]
    ix = np.append(ix, test.shape[0]-1)
    test = test[ix,:]
    print('{} coincident samples obtained in {}.'.format(test.shape[0], datetime.datetime.now()-start_time))
    return test

Я уверен, что есть лучшее решение, но для меня оно работает сейчас. И я знаю, что имена переменных должны определенно выбираться с большей ясностью (например, test), но я приведу в порядок свой код в конце своей магистерской диссертации ... возможно: -)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...