Как назначить numpy.ndarray для временной переменной в цикле nogil в Cython? - PullRequest
0 голосов
/ 09 января 2019

Я пытаюсь внедрить неявную модель рекомендации, и у меня возникают проблемы со временем выполнения кода, вычисляя лучшие 5 предложений для ~ 11kk пользователей по ~ 100k элементов.

Мне удалось частично решить проблему с помощью numpy с помощью некоторых блесков Cython (в блокноте jupyter). В строках с сортировкой по-прежнему используется одно ядро:

%%cython -f
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: linetrace=True
# cython: binding=True
# distutils: define_macros=CYTHON_TRACE_NOGIL=1
from cython.parallel import parallel, prange
import numpy as np
from tqdm import tqdm

def test(users_items=np.random.rand(11402139//1000, 134751//100)
        , int N=5, show_progress=True, int num_threads=1):
    # Define User count and loops indexes
    cdef int users_c = users_items.shape[0], u, i
    # Predefine zero 2-D C-ordered array for recommendations
    cdef int[:,::1] users_recs = np.zeros((users_c, N), dtype=np.intc)
    for u in tqdm(range(users_c), total=users_c, disable=not show_progress):
        # numpy .dot multiplication using multiple cores
        scores = np.random.rand(134751//1000, 10).dot(np.random.rand(10))
        # numpy partial sort
        ids_partial = np.argpartition(scores, -N)[-N:]
        ids_top = ids_partial[np.argsort(scores[ids_partial])]
        # Fill predefined 2-D array
        for i in range(N):
            users_recs[u, i] = ids_top[i]
    return np.asarray(users_recs)
# Working example
tmp = test()

Я это профилировал - np.argpartition потребляет 60% времени функции и использует одно ядро. Я пытаюсь сделать это параллельно, потому что у меня есть сервер с 80 ядрами. Итак, я выполняю операцию .dot для подмножества пользователей (использую несколько ядер) и планирую параллельно заполнять пустой предопределенный массив, используя результаты сортировки по куску (использующие одно ядро), но я застрял с ошибкой из заголовка вопроса:

%%cython -f
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: linetrace=True
# cython: binding=True
# distutils: define_macros=CYTHON_TRACE_NOGIL=1
from cython.parallel import parallel, prange
import numpy as np
from tqdm import tqdm
from math import ceil
def test(int N=10, show_progress=True, int num_threads=1):
    # Define User and Item count and loops indexes
    cdef int users_c = 11402139//1000, items_c = 134751//100, u, i, u_b
    # Predefine zero 2-D C-ordered array for recommendations
    cdef int[:,::1] users_recs = np.zeros((users_c, N), dtype=np.intc)
    # Define memoryview var
    cdef float[:,::1] users_items_scores_mv
    progress = tqdm(total=users_c, disable=not show_progress)
    # For a batch of Users
    for u_b in range(5):
        # Use .dot operation which use multiple cores
        users_items_scores = np.random.rand(num_threads, 10).dot(np.random.rand(134751//100, 10).T)
        # Create memory view to 2-D array, which I'm trying to sort row wise
        users_items_scores_mv = users_items_scores
        # Here it starts, try to use numpy sorting in parallel
        for u in prange(num_threads, nogil=True, num_threads=num_threads):
            ids_partial = np.argpartition(users_items_scores_mv[u], items_c-N)[items_c-N:]
            ids_top = ids_partial[np.argsort(users_items_scores_mv[u][ids_partial])]
            # Fill predefined 2-D array
            for i in range(N):
                users_recs[u_b + u, i] = ids_top[i]
        progress.update(num_threads)
    progress.close()
    return np.asarray(users_recs)

и получил это ( полная ошибка ):

Error compiling Cython file:
------------------------------------------------------------
...
        # Create memory view to 2-D array,
        # which I'm trying to sort row wise
        users_items_scores_mv = users_items_scores
        # Here it starts, try to use numpy sorting in parallel
        for u in prange(num_threads, nogil=True, num_threads=num_threads):
            ids_partial = np.argpartition(users_items_scores_mv[u], items_c-N)[items_c-N:]
           ^
------------------------------------------------------------

/datascc/enn/.cache/ipython/cython/_cython_magic_201b296cd5a34240b4c0c6ed3e58de7c.pyx:31:12: Assignment of Python object not allowed without gil

Я читал о просмотрах памяти и неправильном размещении, но не нашел примера, применимого к моей ситуации.

1 Ответ

0 голосов
/ 16 января 2019

Я закончил с пользовательской функцией C ++, которая заполняет numpy массив параллельно с nogil через openmp. Это потребовало переписать частичную сортировку argpartition numpy с помощью Cython. Алгоритм такой (3-4 можно зациклить):

  1. определить пустой массив A [i, j] и представление памяти B_mv [i, k]; где «i» - размер партии, «j» - некоторые столбцы, а «k» - количество элементов, которые необходимо вернуть после сортировки
  2. создавать указатели в памяти A & B
  3. выполнить некоторые вычисления и заполнить А данными
  4. итерация параллельно по i-s и заполнение B
  5. преобразование результата в читаемую форму

Решение состоит из:

topnc.h - заголовок реализации пользовательской функции:

/* "Copyright [2019] <Tych0n>"  [legal/copyright] */
#ifndef IMPLICIT_TOPNC_H_
#define IMPLICIT_TOPNC_H_

extern void fargsort_c(float A[], int n_row, int m_row, int m_cols, int ktop, int B[]);

#endif  // IMPLICIT_TOPNC_H_

topnc.cpp - тело функции:

#include <vector>
#include <limits>
#include <algorithm>
#include <iostream>

#include "topnc.h"

struct target {int index; float value;};
bool targets_compare(target t_i, target t_j) { return (t_i.value > t_j.value); }

void fargsort_c ( float A[], int n_row, int m_row, int m_cols, int ktop, int B[] ) {
    std::vector<target> targets;
    for ( int j = 0; j < m_cols; j++ ) {
        target c;
        c.index = j;
        c.value = A[(n_row*m_cols) + j];
        targets.push_back(c);
    }
    std::partial_sort( targets.begin(), targets.begin() + ktop, targets.end(), targets_compare );
    std::sort( targets.begin(), targets.begin() + ktop, targets_compare );
    for ( int j = 0; j < ktop; j++ ) {
        B[(m_row*ktop) + j] = targets[j].index;
    }
}

ctools.pyx - пример использования

# distutils: language = c++
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: nonecheck=False
from cython.parallel import parallel, prange
import numpy as np
cimport numpy as np

cdef extern from "topnc.h":
    cdef void fargsort_c ( float A[], int n_row, int m_row, int m_cols, int ktop, int B[] ) nogil

A = np.zeros((1000, 100), dtype=np.float32)
A[:] = np.random.rand(1000, 100).astype(np.float32)
cdef:
    float[:,::1] A_mv = A
    float* A_mv_p = &A_mv[0,0]
    int[:,::1] B_mv = np.zeros((1000, 5), dtype=np.intc)
    int* B_mv_p = &B_mv[0,0]
    int i
for i in prange(1000, nogil=True, num_threads=10, schedule='dynamic'):
    fargsort_c(A_mv_p, i, i, 100, 5, B_mv_p)
B = np.asarray(B_mv)

compile.py - файл компиляции; запустите его с помощью команды "python compile.py build_ext --inplace -f" в терминале (это приведет к файлу ctools.cpython - *. so, который вы затем будете использовать для импорта):

from os import path
import numpy
from setuptools import setup, Extension
from Cython.Distutils import build_ext
from Cython.Build import cythonize

ext_utils = Extension(
    'ctools'
    , sources=['ctools.pyx', 'topnc.cpp']
    , include_dirs=[numpy.get_include()]
    , extra_compile_args=['-std=c++0x', '-Os', '-fopenmp']
    , extra_link_args=['-fopenmp']
    , language='c++'
)

setup(
    name='ctools',
    setup_requires=[
        'setuptools>=18.0'
        , 'cython'
        , 'numpy'
    ]
    , cmdclass={'build_ext': build_ext}
    , ext_modules=cythonize([ext_utils]),
)

Он использовался для добавления функции "рекомендовать все" в неявную модель ALS.

...