Транспортировка вызовов функций между потоками - PullRequest
1 голос
/ 06 марта 2020

Передо мной стоит очень сложная задача. Что довольно легко описать, но сложно реализовать, и я действительно не знаю, как это сделать. Может быть, кто-то знает более простой способ:

Я хочу сохранить параметры (стек и регистры) вызова функции в кучу, а после этого восстановить эти параметры в другом потоке.

Предположим, следующая функция:

int worker(int p1, int p2, ...) // variadic
{
  return enq(); // will pack the parameters and return a Job structure

  ... // some heavy work that must be executed by another thread
}

Моя отправная точка - две структуры, первая содержит текущий кадр стека

struct StackFrame
{
  struct StackFrame *next;
  void *returnAddr;
};

А вторая содержит сохраненные параметры и точку возврата рабочего

struct Job
{
  void *registers[];
  size_t regCount; 

  void *stackFrame;
  size_t frameSize;  

  void *workerAddr;  
};

Теперь функция enq() будет упаковывать рабочие параметры и ставить в очередь задание, используя пул потоков (уже работающий). Я предвидел что-то вроде этого:

Job* enq()
{ 
  // get the caller stackFrame 
  register struct StackFrame *fp __builtin_frame_address(1);

  // save the stack parameters of the caller to the heap,
  Job *job = new Job;
  job->frameSize = frame->next - frame;
  job->frameContent = malloc (job->frameSize);
  memcpy (job->stackContent, frame, job->frameSize );

  job->workerAddr = frame.workerAddr // to where the worker Thread will jump 

  // !! I'm stuck here !! 

  // copy all the registers to memory (ideally only the used as parameters)
  job.registers = ... // in i32 there was an instruction called PUSHA, but not on i64

  return job // real, threadPool.push(job)
}

Теперь, на рабочей стороне, функция deq() выполнит обратную операцию enq(), что-то вроде этого:

void deq(Job *job) 
{ 
  // real, Job *job = threadPool.pop()

  // restore the registers parameters
  POPA(job->registers, job->regCount) // just like (i32 POPA)

  // restore the stack frame. 
  push(job->frameContent, job->frameSize) 

  // execute the worker 
  call(job->workerAddr);

  // mark the Job as done
}

In на стороне клиента, я хочу вызвать эту функцию так же, как:

  Job* promise = worker(1, 2, "a variadic param");
  wait(promise); // or wait(promise, callback)

Функция enq() должна упаковать параметры вызывающей стороны в структуру Job изнутри рабочей функции.

The wait() функция не является реальной проблемой, и она здесь, чтобы показать, как все это должно работать.

Это все, что у меня есть.

Знаете ли вы, как решить любой из этих пропущенных шагов и помочь мне немного приблизиться к моим намерениям? Или даже лучше, более простой и высокоуровневый способ сделать это?

Я использую G CC 9.2.1 в Ubuntu 19 64 бит.

Ответы [ 3 ]

2 голосов
/ 10 марта 2020

Вот очень простое и эффективное решение, реализующее ваши enq и deq (с небольшими изменениями для краткости).

Я оставил более раннее решение со встроенной сборкой, потому что вы упомянули регистры и стек, но это Решение не требует никакой сборки и будет работать для любой функции с любым числом и типом параметров.

Для быстрого просмотра «стек и регистры» хранятся в экземплярах ThreadFunction. Вы просто используете ThreadFunction :: Call, чтобы вызвать сохраненную функцию с ее сохраненными параметрами. Основная функция очень проста и просто использует enq и deq с некоторыми потоками, но вы можете использовать GetThreadInvokable, чтобы упаковать функцию и ее аргументы в объект ThreadFunction, который затем можно ставить в очередь в любое время.

#include <iostream>
#include <vector>
#include <memory>
#include <thread>
#include <functional>
#include <mutex>
using namespace std;

struct ThreadFunction {
    virtual void Call() = 0;
};

template <typename F, typename ... Args>
struct ThreadFunctionPacked : public ThreadFunction {
    std::function<void(void)> m_lambda;

    ThreadFunctionPacked(F pFunc, Args ... args) {
        m_lambda = [pFunc, args...]() {
            pFunc(args...);
        };
    }

    virtual void Call() {
        m_lambda();
    }
};

template <typename F, typename ... Args>
ThreadFunction* GetThreadInvokable(F pFunc, Args... args) {
    ThreadFunction* ret = (ThreadFunction*) new ThreadFunctionPacked<F, Args...> (pFunc, args...);
    return ret;
}

struct Job {
    ThreadFunction* m_funcAndArgs = NULL;
    Job(ThreadFunction* p) {
        m_funcAndArgs = p;
    }

    void Run() {
        m_funcAndArgs->Call();
    }
};

std::mutex mutexJobs;
std::vector<Job*> jobs;
std::mutex mutexConsole;

template <typename F, typename ... Args>
void enq(F pFunc, Args... args) {
    std::lock_guard<std::mutex> lock(mutexJobs);
    jobs.push_back(new Job(GetThreadInvokable(pFunc, args...)));
}

void deq() {
    Job* job = NULL;
    {
        std::lock_guard<std::mutex> lock(mutexJobs);
        if (jobs.empty()) {
            return;
        }

        job = jobs[0];
        jobs.erase(jobs.begin());
    }

    if (job != NULL) {
        job->Run();
    }
}

void testAdd(int a, int b) {
    std::lock_guard<std::mutex> lock(mutexConsole);
    cout << a + b << endl;
}

void testMinus(int a, int b) {
    std::lock_guard<std::mutex> lock(mutexConsole);
    cout << a - b << endl;
}

void testVoid() {
    std::lock_guard<std::mutex> lock(mutexConsole);
    cout << "Void function" << endl;
}

void testPrint(std::string str) {
    std::lock_guard<std::mutex> lock(mutexConsole);
    cout << str << endl;
}

void thread1Func() {
    deq();
    deq();
    deq();
}

void thread2Func() {
    deq();
    deq();
    deq();
}

int main() {
    enq(testAdd, 5, 3);
    enq(testAdd, 10, 50);
    enq(testMinus, 7, 20);
    enq(testVoid);
    enq(testPrint, "Hello");

    std::thread t1(thread1Func);
    std::thread t2(thread2Func);

    t1.join();
    t2.join();

    return 0;
}

Как вы можете видеть, я использовал несколько мьютексов для многопоточности для проверки концепции, но вы должны сделать это более надежным, используя пул потоков в своем решении, как вы запланировали, а также заметьте, что я не реализовывал никаких деструкторов или делайте любое удаление динамической c памяти, что вы, конечно, должны делать (достаточно обернуть указатели в std :: unique_ptr).

Я оставлю свое предыдущее решение на тот случай, если кто-то захочет побороться в реестре. и управление стека, но я думаю, что вы будете счастливы с этим решением.

0 голосов
/ 06 марта 2020

Вот пример быстрого решения, которое делает то, что вы хотите (обратите внимание, что оно использует синтаксис встроенного ассемблера Visual Studio, поэтому оно может немного отличаться от синтаксиса G CC. Оно работает для функций с любым количеством параметров, кроме вас ». Я должен был бы найти способ получения возвращаемого типа из вызванной функции, если бы вы хотели (было бы тривиально, если бы они все возвращали одинаковый тип значения, но небо - предел).

#include <iostream>
#include <vector>
#include <memory>
using namespace std;

template <class T>
vector<pair<size_t, void*>> PackParams(T param) {
    vector<pair<size_t, void*>> ret;
    T* paramMem = new T;
    memcpy(paramMem, &param, sizeof(T));
    ret.push_back(pair<size_t, void*>(sizeof(T), paramMem));

    return ret;
}

template <class T, typename... Targs>
vector<pair<size_t, void*>> PackParams(T param, Targs... otherParams) {
    vector<pair<size_t, void*>> ret;
    T* paramMem = new T;
    memcpy(paramMem, &param, sizeof(T));
    ret.push_back(pair<size_t, void*>(sizeof(T), paramMem));

    vector<pair<size_t, void*>> otherPack = PackParams(otherParams...);

    for (int i = 0; i < otherPack.size(); ++i) {
        ret.push_back(otherPack[i]);
    }

    return ret;
}

vector<pair<size_t, void*>> PackParams(void) {
    vector<pair<size_t, void*>> ret;
    return ret;
}

pair<size_t, void*> AlignParams(vector<pair<size_t, void*>> params) {
    int totalSize = 0;
    for (int i = 0; i < params.size(); ++i) {
        totalSize += params[i].first;
    }

    char* paramBlock = new char[totalSize];

    totalSize = 0;
    for (int i = 0; i < params.size(); ++i) {
        memcpy(&paramBlock[totalSize], params[i].second, params[i].first);
        totalSize += params[i].first;
    }

    return pair<size_t, void*>(totalSize, paramBlock);
}

int test1(int a, int b) {
    cout << a + b << endl;
    return a + b;
}

void Call(void* pFunc, void* params, int paramSize) {
    _asm {
        mov edx, paramSize
        mov ebx, params
        xor ecx,ecx
        loop1:
        push dword ptr [ebx + ecx]
        add ecx, 4
        cmp ecx,paramSize
        jl loop1
        call pFunc
        add esp, paramSize
        nop
    }

}

int main() {
    vector<pair<size_t, void*>> r = PackParams(5, 6);

    pair<size_t,void*> paramData = AlignParams(r);

    Call(test1, paramData.second, paramData.first);

    //system("pause");
    return 0;
}

Передайте paramData и указатель функции на потоки, и они могут использовать функцию Call.

Предупреждения:

Вам потребуется реализовать правильную функцию AlignParams для обеспечения 4-байтового 8-байтового или какое-либо выравнивание достигается в зависимости от требований архитектуры вашей системы.

Этот ответ соответствует соглашению о вызовах x86 stdcall. Другие соглашения о вызовах требуют другой сборки, особенно x64, которая имеет некоторые радикальные отклонения в определенном сценарии ios.

Кроме того, в интересах быстрого предоставления вам решения, я все Некоторые основные операции с памятью * 1011 (не удаляли динамически выделенную память и т. д. c.). Это просто демонстрационное решение, которое в целом показывает вам, что вам нужно сделать для более надежного решения.

Кроме того, вы можете объединить Call, PackParams и AlignParams в одну шаблонную функцию variadi c для упрощения синтаксис конечно.

0 голосов
/ 06 марта 2020

Я столкнулся с подобной проблемой некоторое время назад. В моем случае все функции, которые мне нужно было вызвать, были недействительными. Поэтому я использовал std::bind с шаблонами пакета параметров и сохранил результат в std::vector<std::function<void()>>. Чтобы возвратить указанный тип c, вы также можете вернуть std::future<T> при постановке в очередь «Заданий». Чтобы вернуть случайный тип, вы можете вернуть std::future<std::any> и оставить его вызывающей стороне, чтобы привести его к правильному типу. Вы можете взглянуть на мою реализацию здесь .

...