Переписать цикл for в чистом NumPy, чтобы уменьшить время выполнения - PullRequest
7 голосов
/ 07 апреля 2010

I недавно спросил о попытке оптимизировать цикл Python для научного приложения и получил отличный, умный способ перекодировать его в NumPy, который сократил время выполнения примерно в 100 раз для меня!

Однако вычисление значения B фактически вложено в несколько других циклов, потому что оно оценивается на регулярной сетке позиций. Есть ли такая же умная перезапись NumPy, чтобы сэкономить время на этой процедуре?

Я подозреваю, что прирост производительности для этой части будет менее заметным, и, возможно, недостатки будут состоять в том, что будет невозможно сообщить пользователю о ходе расчета, что результаты не могут быть записаны в выходной файл до конца расчета, и, возможно, что выполнение этого за один огромный шаг будет иметь последствия для памяти? Можно ли обойти это?

import numpy as np
import time

def reshape_vector(v):
    b = np.empty((3,1))
    for i in range(3):
        b[i][0] = v[i]
    return b

def unit_vectors(r):
     return r / np.sqrt((r*r).sum(0))

def calculate_dipole(mu, r_i, mom_i):
    relative = mu - r_i
    r_unit = unit_vectors(relative)
    A = 1e-7

    num = A*(3*np.sum(mom_i*r_unit, 0)*r_unit - mom_i)
    den = np.sqrt(np.sum(relative*relative, 0))**3
    B = np.sum(num/den, 1)
    return B

N = 20000 # number of dipoles
r_i = np.random.random((3,N)) # positions of dipoles
mom_i = np.random.random((3,N)) # moments of dipoles
a = np.random.random((3,3)) # three basis vectors for this crystal
n = [10,10,10] # points at which to evaluate sum
gamma_mu = 135.5 # a constant

t_start = time.clock()
for i in range(n[0]):
    r_frac_x = np.float(i)/np.float(n[0])
    r_test_x = r_frac_x * a[0]
    for j in range(n[1]):
        r_frac_y = np.float(j)/np.float(n[1])
        r_test_y = r_frac_y * a[1]
        for k in range(n[2]):
            r_frac_z = np.float(k)/np.float(n[2])
            r_test = r_test_x +r_test_y + r_frac_z * a[2]
            r_test_fast = reshape_vector(r_test)
            B = calculate_dipole(r_test_fast, r_i, mom_i)
            omega = gamma_mu*np.sqrt(np.dot(B,B))
            # write r_test, B and omega to a file
    frac_done = np.float(i+1)/(n[0]+1)
    t_elapsed = (time.clock()-t_start)
    t_remain = (1-frac_done)*t_elapsed/frac_done
    print frac_done*100,'% done in',t_elapsed/60.,'minutes...approximately',t_remain/60.,'minutes remaining'

Ответы [ 2 ]

2 голосов
/ 07 апреля 2010

Если вы профиль своего кода, вы увидите, что 99% времени выполнения в calculate_dipole, поэтому сокращение времени для этого цикла действительно не даст заметного сокращения времени выполнения.Вам все еще нужно сосредоточиться на calc_dipole, если вы хотите сделать это быстрее.Я попробовал свой код Cython для calculate_dipole на этом и получил сокращение примерно в 2 раза в общем времени.Могут быть и другие способы улучшить код Cython.

2 голосов
/ 07 апреля 2010

Одна очевидная вещь, которую вы можете сделать, это заменить строку

r_test_fast = reshape_vector(r_test)

на

r_test_fast = r_test.reshape((3,1))

Вероятно, не будет иметь большого значения в производительности, но в любом случае это имеет смыслиспользовать встроенные функции NumPy вместо изобретения колеса.

Вообще говоря, как вы, наверное, уже заметили, трюк с оптимизацией NUMPY состоит в том, чтобы выразить алгоритм с помощью NUMPY операций с целым массивом или, по крайней мере,с кусочками вместо итерации по каждому элементу в коде Python.То, что предотвращает этот вид «векторизации», это так называемые циклические зависимости, то есть циклы, где каждая итерация зависит от результата предыдущей итерации.Если коротко взглянуть на ваш код, у вас нет такой вещи, и должна быть возможность векторизовать ваш код просто отлично.

РЕДАКТИРОВАТЬ: Одно решение

У меня нетпроверил, что это правильно, но должно дать вам представление о том, как к нему подойти.

Сначала возьмем функцию cartesian (), которую мы будем использовать .Тогда


def calculate_dipole_vect(mus, r_i, mom_i):
    # Treat each mu sequentially
    Bs = []
    omega = []
    for mu in mus:
        rel = mu - r_i
        r_norm = np.sqrt((rel * rel).sum(1))
        r_unit =  rel / r_norm[:, np.newaxis]
        A = 1e-7

        num = A*(3*np.sum(mom_i * r_unit, 0)*r_unit - mom_i)
        den = r_norm ** 3
        B = np.sum(num / den[:, np.newaxis], 0)
        Bs.append(B)
        omega.append(gamma_mu * np.sqrt(np.dot(B, B)))
    return Bs, omega


# Transpose to get more "natural" ordering with row-major numpy
r_i = r_i.T
mom_i = mom_i.T

t_start = time.clock()
r_frac = cartesian((np.arange(n[0]) / float(n[0]),
                    np.arange(n[1]) / float(n[1]),
                    np.arange(n[2]) / float(n[2])))
r_test = np.dot(r_frac, a)
B, omega = calculate_dipole_vect(r_test, r_i, mom_i)

print 'Total time for vectorized: %f s' % (time.clock() - t_start)

Что ж, в моем тестировании это на самом деле немного медленнее, чем основанный на циклах подход, с которого я начинал.Дело в том, что в оригинальной версии этого вопроса он уже был векторизован с помощью операций с целыми массивами над массивами формы (20000, 3), поэтому любая дальнейшая векторизация на самом деле не приносит дополнительных преимуществ.Фактически, это может ухудшить производительность, как указано выше, возможно, из-за больших временных массивов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...