Передача данных от нескольких производителей C ++ потребителю Python - PullRequest
0 голосов
/ 08 мая 2019

Я пытаюсь расширить поток Python несколькими одновременно работающими потоками C ++ (через Boost.Python).После запуска потоки C ++ работают неопределенно долго, генерируют данные (например, случайное число) и передают эти данные обратно в поток Python.Таким образом, это сценарий с несколькими производителями / одним потребителем, когда производители пишутся на C ++, а потребитель пишется на Python.Есть какой-либо способ сделать это?Можно ли выставить очередь из C ++ в поток Python?

Я видел много решений (например, this ), которые вводят расширение C ++ из Python, освобождают GIL, создают параллельные потоки C ++,запускать потоки параллельно без GIL, завершать и возвращать результаты в код Python.Однако они не запускают потоки C ++ бесконечно и не используют очередь для передачи данных обратно в поток Python.

Это чистая версия C ++, которую я хочу выполнить.Все внутри main должно быть перемещено в основной поток Python, и очередь должна «соединить» потоки C ++ с основным потоком Python.

std::vector< Queue<int> *> packet_queues;


void cpp_thread(int thread_id) {
    int counter = 0;
    while(1) {
        counter++;
        packet_queues[thread_id]->push(counter);
    }
}


np::ndarray get_packet(void) {
    np::ndarray packet = np::zeros(p::make_tuple(packet_queues.size()), np::dtype::get_builtin<int>());
    for(int i = 0; i < packet_queues.size(); i++) {
        int counter;
        packet_queues[i]->pop(counter);
        packet[i] = counter;
    }
    return packet;
}


int main(int argc, char **argv)
{
    // creates Queues
    for (int i = 0; i < 10; i++) {
        packet_queues.push_back(new Queue<int>());
    }

    // start C++ threads
    std::vector<std::thread *> cpp_threads;
    for(int i = 0; i < 10; i++) {
        cpp_threads.push_back(new std::thread(cpp_thread, i));
    }

    while (1) {

        np::ndarray packet = get_packet();

        std::cout << "Packet :: " << p::extract<char const *>(p::str(packet)) << std::endl;
    }

    // join cpp threads
    for(int i = 0; i < cpp_threads.size(); i++) {
        cpp_threads[i]->join();
        delete cpp_threads[i];
    }

    return 0;
}

Используемая реализация очереди: this .

...