Агрегирование результатов в Pandas после использования mpi4py - PullRequest
0 голосов
/ 23 октября 2019

Наконец, вот мой первый вопрос по StackOF:

В качестве проекта для Uni я пытаюсь написать код для KMeans с нуля, а затем параллельно запускать различные повторы со случайными центрами запуска, используя mpi4py. ,

Вот код:

#!/usr/bin/env python
# coding: utf-8

# In[3]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from mpi4py import MPI
# import statistics as stat

comm=MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print('no of processors is', size)
print('this is the process #', rank)
df = pd.read_csv('data.dat',
                   sep='   ',
                   header=None,
                   index_col=0, engine='python' )

n_mus = [1, 2, 4, 12]  # 100]#, 1000]
cost_k = []
k_vals = range(1, 5, 2)
# k_vals = range(1, 30, 6)

for orig_n_mu in n_mus:
    n_mu = orig_n_mu//size
    if rank in range(orig_n_mu%size):
        n_mu += 1
    for k in k_vals:
        cost_n = []
        for n in range(1, n_mu + 1):
            np.random.seed(n * k + k)
            kx = np.random.uniform(df[1].min(), df[1].max(), k)
            np.random.seed(n * k + k + 1)
            ky = np.random.uniform(df[2].min(), df[2].max(), k)
            manh = pd.DataFrame()
            for c in range(k):
                manh[c] = abs(df[1] - kx[c]) + abs(df[2] - ky[c])
            df['center'] = manh.idxmin(axis='columns')
            kx = df.groupby('center').mean()[1]
            ky = df.groupby('center').mean()[2]
            if df.center.unique().shape[0] != k:
                print('not all centers took up clusters at the number', n,
                      'repetition')
                print('the current number of clusters is:',
                      df.center.unique().shape[0], 'instead of', k)
            diff = 10
            while diff > 1e-4:
                cost = manh.min(axis=1).mean()
                for c in df.center.unique():
                    manh[c] = abs(df[1] - kx[c]) + abs(df[2] - ky[c])
                df['center'] = manh.idxmin(axis='columns')
                kx = df.groupby('center').mean()[1]
                ky = df.groupby('center').mean()[2]
                new_cost = manh.min(axis=1).mean()
                diff = cost - new_cost
            cost_n.append(new_cost)
        cost_k.append([k, rank, n_mu, orig_n_mu, cost_n])
print('process #', rank, 'is done here')
all_cost = comm.gather(cost_k, root = 0)
if (rank == 0):
    print('check point #1')
    all_cost = np.reshape(all_cost, newshape=(-1,len(cost_k[0])))
    print('the shape of all cost is', all_cost.shape)
    res = pd.DataFrame(all_cost, columns=['k_val', 'rank', 'n_mu', 'orig_n_mu','cost_res'])
    noruns = (res.n_mu == 0)
    res = res[~noruns].copy()
    res.reset_index(inplace=True, drop=True)
    print('check point #2')
    cost_funcs = pd.DataFrame(res.cost_res.to_list())
    print('check point #3')
    km_df = pd.merge(res, cost_funcs, how='outer',left_index=True, right_index=True)
    print('check point #4')
    km_df.drop(columns='cost_res', inplace = True)

    km_df['avg_final_cost'] = cost_funcs.apply(np.nanmean, axis =1)
    km_df['std_final_cost'] = cost_funcs.apply(np.nanstd, axis =1)
    km_df['min_final_cost'] = cost_funcs.apply(min, axis =1)
    km_df['max_final_cost'] = cost_funcs.apply(max, axis =1)

    km_df.to_csv('km_df_test_para.csv')

# km_df

Получившийся CSV выглядит примерно так: образец снимка экрана CSV

Здесь n - количество прогонов накаждое ядро ​​и orig_n - это общее количество прогонов, на которых я должен выполнить анализ, время записи, проверка стандартного значения, среднее значение и т. д. Столбцы 0,1,2, ... являются результатами каждого отдельного прогона, причем имя столбца равноколичество прогонов на одном ядре.

Теперь мне нужно, чтобы все эти прогоны были сгруппированы по n_orig. Но понятия не имею, как сказать пандам поместить все значения с одинаковыми n_orig и k в одну строку. Как вы можете сказать, я очень плохо знаком с mpi и не знаю, как еще собирать мои данные. Команды Gather и Gatherv продолжают сбрасывать ошибку «0».

Буду признателен за любую помощь, которую вы можете оказать:)

1 Ответ

0 голосов
/ 31 октября 2019

Я не совсем понимаю, что вы имеете в виду, но, может быть, вам будет достаточно отсортировать строки по n_orig и k_val?

km_df.sort_values(by=['n_orig', 'k_val'], inplace=True)

Или, может быть, вы можете разбить km_df на меньшие кадры данныхпостоянные n_orig и k_val:

from itertools import product

groups = []
for n_orig, k_val in product(km_df['n_orig'].unique(), km_df['k_val'].unique()):
    sel = (km_df['n_orig'] == n_orig) & (km_df['k_val'] == k_val)
    groups.append(km_df[sel])
...