Как векторизовать сложный код, содержащий eigvalsh в Python - PullRequest
0 голосов
/ 09 декабря 2018

У меня есть следующий код (извините, он не слишком минимален, я уже пытался уменьшить его по сравнению с оригиналом).

В основном, у меня проблемы с производительностью при запуске метода / функции eval_s(), в которой я:

1) найти 4 собственных значения эрмитовой матрицы 4x4 с eigvalsh()

2) сложить обратных собственных значений в переменную result

3) и я повторяю шаги 1 и 2 для многих матриц, параметризованных x,y,z, сохраняя накопленную сумму в result.

Сколько раз я повторяю свои вычисления (нахождение собственных значений и суммирование) на шаге 3 зависит от переменной ksep в моем коде, и мне нужно, чтобы это число увеличивалось в моем фактическом коде (т. е. ksep должно уменьшиться ).Но вычисления в eval_s() имеют цикл for x,y,z, который, как я предполагаю, действительно замедляет процесс.[Попробуйте ksep=0.5, чтобы понять, что я имею в виду.]

Есть ли способ векторизации метода, указанного в моем примере кода (или вообще функций, которые включают в себя поиск собственных значений параметризованных матриц)?

Код:

import numpy as np
import sympy as sp
import itertools as it
from sympy.abc import x, y, z


class Solver:
    def __init__(self, vmat):
        self._vfunc = sp.lambdify((x, y, z),
                                  expr=vmat,
                                  modules='numpy')
        self._q_count, self._qs = None, []  # these depend on ksep!

    ################################################################
    # How to vectorize this?
    def eval_s(self, stiff):
        assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"
        result = 0
        for k in self._qs:
            evs = np.linalg.eigvalsh(self._vfunc(*k))
            result += np.sum(np.divide(1., (stiff + evs)))
        return result.real - 4 * self._q_count
    ################################################################

    def populate_qs(self, ksep: float = 1.7):
        self._qs = [(kx, ky, kz) for kx, ky, kz
                    in it.product(np.arange(-3*np.pi, 3.01*np.pi, ksep),
                                  np.arange(-3*np.pi, 3.01*np.pi, ksep),
                                  np.arange(-3*np.pi, 3.01*np.pi, ksep))]
        self._q_count = len(self._qs)


def test():
    vmat = sp.Matrix([[1, sp.cos(x/4+y/4), sp.cos(x/4+z/4), sp.cos(y/4+z/4)],
                      [sp.cos(x/4+y/4), 1, sp.cos(y/4-z/4), sp.cos(x/4 - z/4)],
                      [sp.cos(x/4+z/4), sp.cos(y/4-z/4), 1, sp.cos(x/4-y/4)],
                      [sp.cos(y/4+z/4), sp.cos(x/4-z/4), sp.cos(x/4-y/4), 1]]) * 2
    solver = Solver(vmat)
    solver.populate_qs(ksep=1.7)  # <---- Performance starts to worsen (in eval_s) when ksep is reduced!
    print(solver.eval_s(0.65))


if __name__ == "__main__":
    import timeit
    print(timeit.timeit("test()", setup="from __main__ import test", number=100))

ps Симпатичная часть кода может показаться странной, но она служит цели в моем исходном коде.

Ответы [ 2 ]

0 голосов
/ 09 декабря 2018

@ тел сделал большую часть работы.Вот как вы можете получить еще 2-кратное ускорение поверх их 20-кратного.

Делайте линейную алгебру вручную.Когда я попробовал это, я был шокирован тем, насколько расточительна крошка на маленьких матрицах:

>>> from timeit import timeit

# using eigvalsh
>>> print(timeit("test(False, 0.1)", setup="from __main__ import test", number=3))
-2301206.495955009
-2301206.495955009
-2301206.495955009
55.794611917983275
>>> print(timeit("test(False, 0.3)", setup="from __main__ import test", number=5))
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
3.400342195003759

# by hand
>>> print(timeit("test(True, 0.1)", setup="from __main__ import test", number=3))
-2301206.495955076
-2301206.495955076
-2301206.495955076
26.67294767702697
>>> print(timeit("test(True, 0.3)", setup="from __main__ import test", number=5))
-85240.46154498379
-85240.46154498379
-85240.46154498379
-85240.46154498379
-85240.46154498379
1.5047460949863307

Обратите внимание, что часть ускорения, вероятно, замаскирована общим кодом, на одной только линейной алгебре, кажется, больше, хотя я нене проверяйте слишком остро.

Одно предостережение: я использую дополнение Шура на разбиении матриц 2 на 2 для вычисления диагональных элементов обратного.Это потерпит неудачу, если дополнения Шура не существуют, т. Е. Если верхняя левая или нижняя правая субматрица необратимы.

Вот модифицированный код:

import itertools as it
import numpy as np
import sympy as sp
from sympy.abc import x, y, z
from sympy.core.numbers import Number
from sympy.utilities.lambdify import implemented_function

xones = implemented_function('xones', lambda x: np.ones(len(x)))
lfuncs = {'xones': xones}

def vectorizemat(mat):
    ret = mat.copy()
    for x in mat.free_symbols: break
    for i,j in it.product(*(range(s) for s in mat.shape)):
        if isinstance(mat[i,j], Number):
            ret[i,j] = xones(x) * mat[i,j]
    return ret

class Solver:
    def __init__(self, vmat):
        vmat = vectorizemat(vmat)
        self._vfunc = sp.lambdify((x, y, z),
                                  expr=vmat,
                                  modules=[lfuncs, 'numpy'])
        self._q_count, self._qs = None, []  # these depend on ksep!

    def eval_s_vectorized_completely(self, stiff):
        assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"
        mats = self._vfunc(*self._qs.T).T
        evs = np.linalg.eigvalsh(mats)
        result = np.sum(np.divide(1., (stiff + evs)))
        return result.real - 4 * self._q_count

    def eval_s_pp(self, stiff):
        assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"
        mats = self._vfunc(*self._qs.T).T
        np.einsum('...ii->...i', mats)[...] += stiff
        (A, B), (C, D) = mats.reshape(-1, 2, 2, 2, 2).transpose(1, 3, 0, 2, 4)
        res = 0
        for AA, BB, CC, DD in ((A, B, C, D), (D, C, B, A)):
            (a, b), (c, d) = DD.transpose(1, 2, 0)
            rdet = 1 / (a*d - b*b)[:, None]
            iD = DD[..., ::-1, ::-1].copy()
            iD.reshape(-1, 4)[..., 1:3] *= -rdet
            np.einsum('...ii->...i', iD)[...] *= rdet
            (Aa, Ab), (Ac, Ad) = AA.transpose(1, 2, 0)
            (Ba, Bb), (Bc, Bd) = BB.transpose(1, 2, 0)
            (Da, Db), (Dc, Dd) = iD.transpose(1, 2, 0)
            a = Aa - Ba*Ba*Da - 2*Bb*Ba*Db - Bb*Bb*Dd
            d = Ad - Bd*Bd*Dd - 2*Bc*Bd*Db - Bc*Bc*Da
            b = Ab - Ba*Bc*Da - Ba*Bd*Db - Bb*Bd*Dd - Bb*Bc*Dc
            res += ((a + d) / (a*d - b*b)).sum()
        return res - 4 * self._q_count

    def populate_qs(self, ksep: float = 1.7):
        self._qs = np.array([(kx, ky, kz) for kx, ky, kz
                    in it.product(np.arange(-3*np.pi, 3.01*np.pi, ksep),
                                  np.arange(-3*np.pi, 3.01*np.pi, ksep),
                                  np.arange(-3*np.pi, 3.01*np.pi, ksep))])
        self._q_count = len(self._qs)


def test(manual=False, ksep=0.3):
    vmat = sp.Matrix([[1, sp.cos(x/4+y/4), sp.cos(x/4+z/4), sp.cos(y/4+z/4)],
                      [sp.cos(x/4+y/4), 1, sp.cos(y/4-z/4), sp.cos(x/4 - z/4)],
                      [sp.cos(x/4+z/4), sp.cos(y/4-z/4), 1, sp.cos(x/4-y/4)],
                      [sp.cos(y/4+z/4), sp.cos(x/4-z/4), sp.cos(x/4-y/4), 1]]) * 2
    solver = Solver(vmat)
    solver.populate_qs(ksep=ksep)  # <---- Performance starts to worsen (in eval_s) when ksep is reduced!
    if manual:
        print(solver.eval_s_pp(0.65))
    else:
        print(solver.eval_s_vectorized_completely(0.65))
0 голосов
/ 09 декабря 2018

Вы можете, и вот как:

def eval_s_vectorized(self, stiff):
    assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"
    mats = np.stack([self._vfunc(*k) for k in self._qs], axis=0)
    evs = np.linalg.eigvalsh(mats)
    result = np.sum(np.divide(1., (stiff + evs)))
    return result.real - 4 * self._q_count

Это по-прежнему оставляет оценку выражения Sympy невекторизованной.Эту часть немного сложно векторизовать, в основном из-за 1 s во входной матрице.Вы можете сделать полностью векторизованную версию своего кода, изменив Solver так, чтобы он заменял скалярные константы константами массива в vmat:

import itertools as it
import numpy as np
import sympy as sp
from sympy.abc import x, y, z
from sympy.core.numbers import Number
from sympy.utilities.lambdify import implemented_function

xones = implemented_function('xones', lambda x: np.ones(len(x)))
lfuncs = {'xones': xones}

def vectorizemat(mat):
    ret = mat.copy()
    # get the first element of the set of symbols that mat uses
    for x in mat.free_symbols: break
    for i,j in it.product(*(range(s) for s in mat.shape)):
        if isinstance(mat[i,j], Number):
            ret[i,j] = xones(x) * mat[i,j]
    return ret

class Solver:
    def __init__(self, vmat):
        self._vfunc = sp.lambdify((x, y, z),
                                  expr=vectorizemat(vmat),
                                  modules=[lfuncs, 'numpy'])
        self._q_count, self._qs = None, []  # these depend on ksep!

    def eval_s_vectorized_completely(self, stiff):
        assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"
        evs = np.linalg.eigvalsh(self._vfunc(*self._qs.T).T)
        result = np.sum(np.divide(1., (stiff + evs)))
        return result.real - 4 * self._q_count

    def populate_qs(self, ksep: float = 1.7):
        self._qs = np.array([(kx, ky, kz) for kx, ky, kz
                    in it.product(np.arange(-3*np.pi, 3.01*np.pi, ksep),
                                  np.arange(-3*np.pi, 3.01*np.pi, ksep),
                                  np.arange(-3*np.pi, 3.01*np.pi, ksep))])
        self._q_count = len(self._qs)

Тестирование / Синхронизация

Для малых ksep векторизованная версия примерно в 2 раза быстрее оригинала, а полностью векторизованная версия примерно в 20 раз быстрее:

# old version for ksep=.3
import timeit
print(timeit.timeit("test()", setup="from __main__ import test", number=10))
-85240.46154500882
-85240.46154500882
-85240.46154500882
-85240.46154500882
-85240.46154500882
-85240.46154500882
-85240.46154500882
-85240.46154500882
-85240.46154500882
-85240.46154500882
118.42847006605007

# vectorized version for ksep=.3
import timeit
print(timeit.timeit("test()", setup="from __main__ import test", number=10))
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
64.95763925800566

# completely vectorized version for ksep=.3
import timeit
print(timeit.timeit("test()", setup="from __main__ import test", number=10))
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
-85240.46154498367
5.648927717003971

Ошибка округления в результатах векторизованной версии немного отличается от оригинала.Вероятно, это связано с различиями в том, как вычисляется сумма в result.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...