Есть ли способ запустить две задачи асинхронно в одном потоке? - PullRequest
2 голосов
/ 13 апреля 2020

Я работаю над программным продуктом, который выполняет интенсивные операции в основном потоке. Запуск их в отдельном потоке не поддерживается проектом и не будет изменен.

В то же время нам нужно обрабатывать движения мыши, поступающие из пользовательского интерфейса. В одном случае курсор мыши зависает, потому что основной поток занят вычислениями.

Кажется хорошим примером для введения асинхронной операции: выполнять вычисления асинхронно в отдельном потоке, пока основной поток все еще обрабатывает движения мыши. Но, как я уже говорил, он не поддерживается в текущем проекте.

Недавно я натолкнулся на идею асинхронного запуска двух задач в одном потоке. Это означает, что контекст потока переключается между двумя задачами, и каждая задача частично выполняется в течение определенного периода времени, пока каждая из них не будет завершена.

Возможно ли это в C ++? Версия языка (11 или 14) не имеет значения.
Программное обеспечение использует WinApi и стандартную очередь сообщений для получения событий мыши.

Пытался взглянуть на Microsoft PPL, но, насколько я понимаю, библиотека не имеет значения помощь в этом случае.
Спасибо всем за помощь.

Ответы [ 5 ]

1 голос
/ 13 апреля 2020

Boost.Coroutine , Boost.Context и Boost.Asio все поддерживают однопотоковый параллелизм на том или ином уровне. Сопрограммы - это кооперативные, возвращающиеся, прерываемые, возобновляемые функции. Контекст - это переключение контекста пользователя на землю. Исполнители Asio могут запланировать выполнение множества разных задач в одном потоке. Для вашего случая, я думаю, вы можете выбрать, что вам удобно вводить в ваше приложение.

РЕДАКТИРОВАТЬ

Boost.Fiber реализует мини-нитевидный «волокна» поверх библиотеки контекста.

1 голос
/ 13 апреля 2020

То, что вы ищете, это совместная многозадачность. Это возможно в одной теме. Вы можете взглянуть на сопрограммы, например, в boost или стандартной библиотеке (начиная с C ++ 20).

Вы также можете свернуть свою урезанную версию. Ключевые ключи:

  • Каждая задача должна хранить свой контекст (например, параметры) сама
  • Каждая задача должна иметь способ приостановить и возобновить операции. Он сам решает, когда следует приостановить.
  • Вам может понадобиться какой-то планировщик, который отслеживает все задачи и часто их запускает. Возможно, вы захотите сконструировать его так, чтобы GUI main l oop вызывал ваш планировщик, который работает максимум примерно 30-50 мс, передавая доступный бюджет времени для каждой из задач, которые он отслеживает.

Это вполне осуществимо, если потоки вообще не являются опцией.

0 голосов
/ 13 апреля 2020

Учитывая, что вы используете winapi и пользовательский интерфейс, поэтому у вас уже есть обработка сообщений, я бы посоветовал разбить проблемную операцию c на несколько шагов и использовать пользовательские сообщения. Попросите каждый шаг в проблемной операции c опубликовать сообщение, которое вызывает следующий шаг. Поскольку это что-то, что windows уже обрабатывает (имеет дело с сообщениями), оно должно вписаться в то, что у вас уже есть, более аккуратно, чем пытаться использовать сопрограммы или windows волокна.

Это замедлит общую обработку c операция несколько проблематичная, но пользовательский интерфейс будет реагировать.

Однако я бы также серьезно подумал об отказе от однопоточного подхода. Если ваша проблемная операция c просто берет ввод и производит вывод, перетаскивая эту операцию в отдельный поток и имея дело с результатом, когда он приходит (опять же через опубликованное сообщение), часто является очень разумным решением.

0 голосов
/ 13 апреля 2020

Поскольку вы нацелены на windows, но не имеете доступа к сопрограммам c ++ 20 (используя старый компилятор), вы можете использовать winapi Fibers, который похож на тяжелые сопрограммы.

Это задокументировано здесь: Волокна Win32 приложения

И вот пример его использования:

#include <windows.h>
#include <tchar.h>
#include <stdio.h>

VOID
__stdcall
ReadFiberFunc(LPVOID lpParameter);

VOID
__stdcall
WriteFiberFunc(LPVOID lpParameter);

void DisplayFiberInfo(void);

typedef struct
{
   DWORD dwParameter;          // DWORD parameter to fiber (unused)
   DWORD dwFiberResultCode;    // GetLastError() result code
   HANDLE hFile;               // handle to operate on
   DWORD dwBytesProcessed;     // number of bytes processed
} FIBERDATASTRUCT, *PFIBERDATASTRUCT, *LPFIBERDATASTRUCT;

#define RTN_OK 0
#define RTN_USAGE 1
#define RTN_ERROR 13

#define BUFFER_SIZE 32768   // read/write buffer size
#define FIBER_COUNT 3       // max fibers (including primary)

#define PRIMARY_FIBER 0 // array index to primary fiber
#define READ_FIBER 1    // array index to read fiber
#define WRITE_FIBER 2   // array index to write fiber

LPVOID g_lpFiber[FIBER_COUNT];
LPBYTE g_lpBuffer;
DWORD g_dwBytesRead;

int __cdecl _tmain(int argc, TCHAR *argv[])
{
   LPFIBERDATASTRUCT fs;

   if (argc != 3)
   {
      printf("Usage: %s <SourceFile> <DestinationFile>\n", argv[0]);
      return RTN_USAGE;
   }

   //
   // Allocate storage for our fiber data structures
   //
   fs = (LPFIBERDATASTRUCT) HeapAlloc(
                              GetProcessHeap(), 0,
                              sizeof(FIBERDATASTRUCT) * FIBER_COUNT);

   if (fs == NULL)
   {
      printf("HeapAlloc error (%d)\n", GetLastError());
      return RTN_ERROR;
   }

   //
   // Allocate storage for the read/write buffer
   //
   g_lpBuffer = (LPBYTE)HeapAlloc(GetProcessHeap(), 0, BUFFER_SIZE);
   if (g_lpBuffer == NULL)
   {
      printf("HeapAlloc error (%d)\n", GetLastError());
      return RTN_ERROR;
   }

   //
   // Open the source file
   //
   fs[READ_FIBER].hFile = CreateFile(
                                    argv[1],
                                    GENERIC_READ,
                                    FILE_SHARE_READ,
                                    NULL,
                                    OPEN_EXISTING,
                                    FILE_FLAG_SEQUENTIAL_SCAN,
                                    NULL
                                    );

   if (fs[READ_FIBER].hFile == INVALID_HANDLE_VALUE)
   {
      printf("CreateFile error (%d)\n", GetLastError());
      return RTN_ERROR;
   }

   //
   // Open the destination file
   //
   fs[WRITE_FIBER].hFile = CreateFile(
                                     argv[2],
                                     GENERIC_WRITE,
                                     0,
                                     NULL,
                                     CREATE_NEW,
                                     FILE_FLAG_SEQUENTIAL_SCAN,
                                     NULL
                                     );

   if (fs[WRITE_FIBER].hFile == INVALID_HANDLE_VALUE)
   {
      printf("CreateFile error (%d)\n", GetLastError());
      return RTN_ERROR;
   }

   //
   // Convert thread to a fiber, to allow scheduling other fibers
   //
   g_lpFiber[PRIMARY_FIBER]=ConvertThreadToFiber(&fs[PRIMARY_FIBER]);

   if (g_lpFiber[PRIMARY_FIBER] == NULL)
   {
      printf("ConvertThreadToFiber error (%d)\n", GetLastError());
      return RTN_ERROR;
   }

   //
   // Initialize the primary fiber data structure.  We don't use
   // the primary fiber data structure for anything in this sample.
   //
   fs[PRIMARY_FIBER].dwParameter = 0;
   fs[PRIMARY_FIBER].dwFiberResultCode = 0;
   fs[PRIMARY_FIBER].hFile = INVALID_HANDLE_VALUE;

   //
   // Create the Read fiber
   //
   g_lpFiber[READ_FIBER]=CreateFiber(0,ReadFiberFunc,&fs[READ_FIBER]);

   if (g_lpFiber[READ_FIBER] == NULL)
   {
      printf("CreateFiber error (%d)\n", GetLastError());
      return RTN_ERROR;
   }

   fs[READ_FIBER].dwParameter = 0x12345678;

   //
   // Create the Write fiber
   //
   g_lpFiber[WRITE_FIBER]=CreateFiber(0,WriteFiberFunc,&fs[WRITE_FIBER]);

   if (g_lpFiber[WRITE_FIBER] == NULL)
   {
      printf("CreateFiber error (%d)\n", GetLastError());
      return RTN_ERROR;
   }

   fs[WRITE_FIBER].dwParameter = 0x54545454;

   //
   // Switch to the read fiber
   //
   SwitchToFiber(g_lpFiber[READ_FIBER]);

   //
   // We have been scheduled again. Display results from the 
   // read/write fibers
   //
   printf("ReadFiber: result code is %lu, %lu bytes processed\n",
   fs[READ_FIBER].dwFiberResultCode, fs[READ_FIBER].dwBytesProcessed);

   printf("WriteFiber: result code is %lu, %lu bytes processed\n",
   fs[WRITE_FIBER].dwFiberResultCode, fs[WRITE_FIBER].dwBytesProcessed);

   //
   // Delete the fibers
   //
   DeleteFiber(g_lpFiber[READ_FIBER]);
   DeleteFiber(g_lpFiber[WRITE_FIBER]);

   //
   // Close handles
   //
   CloseHandle(fs[READ_FIBER].hFile);
   CloseHandle(fs[WRITE_FIBER].hFile);

   //
   // Free allocated memory
   //
   HeapFree(GetProcessHeap(), 0, g_lpBuffer);
   HeapFree(GetProcessHeap(), 0, fs);

   return RTN_OK;
}

VOID
__stdcall
ReadFiberFunc(
             LPVOID lpParameter
             )
{
   LPFIBERDATASTRUCT fds = (LPFIBERDATASTRUCT)lpParameter;

   //
   // If this fiber was passed NULL for fiber data, just return,
   // causing the current thread to exit
   //
   if (fds == NULL)
   {
      printf("Passed NULL fiber data; exiting current thread.\n");
      return;
   }

   //
   // Display some information pertaining to the current fiber
   //
   DisplayFiberInfo();

   fds->dwBytesProcessed = 0;

   while (1)
   {
      //
      // Read data from file specified in the READ_FIBER structure
      //
      if (!ReadFile(fds->hFile, g_lpBuffer, BUFFER_SIZE, 
         &g_dwBytesRead, NULL))
      {
         break;
      }

      //
      // if we reached EOF, break
      //
      if (g_dwBytesRead == 0) break;

      //
      // Update number of bytes processed in the fiber data structure
      //
      fds->dwBytesProcessed += g_dwBytesRead;

      //
      // Switch to the write fiber
      //
      SwitchToFiber(g_lpFiber[WRITE_FIBER]);
   } // while

   //
   // Update the fiber result code
   //
   fds->dwFiberResultCode = GetLastError();

   //
   // Switch back to the primary fiber
   //
   SwitchToFiber(g_lpFiber[PRIMARY_FIBER]);
}

VOID
__stdcall
WriteFiberFunc(
              LPVOID lpParameter
              )
{
   LPFIBERDATASTRUCT fds = (LPFIBERDATASTRUCT)lpParameter;
   DWORD dwBytesWritten;

   //
   // If this fiber was passed NULL for fiber data, just return,
   // causing the current thread to exit
   //
   if (fds == NULL)
   {
      printf("Passed NULL fiber data; exiting current thread.\n");
      return;
   }

   //
   // Display some information pertaining to the current fiber
   //
   DisplayFiberInfo();

   //
   // Assume all writes succeeded.  If a write fails, the fiber
   // result code will be updated to reflect the reason for failure
   //
   fds->dwBytesProcessed = 0;
   fds->dwFiberResultCode = ERROR_SUCCESS;

   while (1)
   {
      //
      // Write data to the file specified in the WRITE_FIBER structure
      //
      if (!WriteFile(fds->hFile, g_lpBuffer, g_dwBytesRead, 
         &dwBytesWritten, NULL))
      {
         //
         // If an error occurred writing, break
         //
         break;
      }

      //
      // Update number of bytes processed in the fiber data structure
      //
      fds->dwBytesProcessed += dwBytesWritten;

      //
      // Switch back to the read fiber
      //
      SwitchToFiber(g_lpFiber[READ_FIBER]);
   }  // while

   //
   // If an error occurred, update the fiber result code...
   //
   fds->dwFiberResultCode = GetLastError();

   //
   // ...and switch to the primary fiber
   //
   SwitchToFiber(g_lpFiber[PRIMARY_FIBER]);
}

void
DisplayFiberInfo(
                void
                )
{
   LPFIBERDATASTRUCT fds = (LPFIBERDATASTRUCT)GetFiberData();
   LPVOID lpCurrentFiber = GetCurrentFiber();

   //
   // Determine which fiber is executing, based on the fiber address
   //
   if (lpCurrentFiber == g_lpFiber[READ_FIBER])
      printf("Read fiber entered");
   else
   {
      if (lpCurrentFiber == g_lpFiber[WRITE_FIBER])
         printf("Write fiber entered");
      else
      {
         if (lpCurrentFiber == g_lpFiber[PRIMARY_FIBER])
            printf("Primary fiber entered");
         else
            printf("Unknown fiber entered");
      }
   }

   //
   // Display dwParameter from the current fiber data structure
   //
   printf(" (dwParameter is 0x%lx)\n", fds->dwParameter);
}
0 голосов
/ 13 апреля 2020

Вот как я бы реализовал свою собственную многозадачность для совместного выполнения:

enum class eStep
{
    START,
    STEP1,
    STEP2,
    DONE
};

struct sLongFuncContext
{
    //whatver is meaning full to go from one step to the next
};

eStep long_func_split_in_steps(eStep aStep,sLongFuncContext &aContext)
{
    eStep next;
    switch (aStep)
    {
        case eStep::START:
        // execute first part of func, save context
        next = eStep::STEP1;
        break;

        case eStep::STEP1:
         // execute 2nd part of func, save context
        next = eStep::STEP2;
        break;

        case eStep::STEP2:
        next = eStep::DONE;
        break;
        // repeat 

    };
    return (next);
}

int main()
{
    eStep step = eStep::START;
    sLongFuncContext context;
    while (step != eStep::DONE)
    {
        // do a part of the long function
        step = long_func_split_in_steps(step,context);

        // handle mouse events
        // ...
    }



    return 0;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...