Как остановить все темы, используя manualResetEvent или любое другое предложение, чтобы остановить все мои темы? - PullRequest
0 голосов
/ 16 января 2019

Я использую 10 потоков, каждый поток получает 10 записей с нулевым статусом, а затем обновить статус до "Q". Теперь эти записи запрашиваются разными API, и после успешного ответа я обновляю статус до «C», все идет хорошо. В качестве ограничителя для остановки всех потоков я использовал ManualResetEvent после того, как (j> 30) ManualResetEven.Set () я проверил условие

if (_stopper.WaitOne(0, false))
  {
     break;
  } 

чтобы остановить цикл while и обновить состояние до нуля для оставшихся записей каждого потока, чей статус «Q». При остановке я обновляю статус с «Q» до нуля во всех записях 10 потоков, проблема в том, что один записи моего потока не обновляет статус до нуля от "Q"?

Вот мой код:

 public void MainProcess()
    {
        int totalThread = 10;
        Thread[] ThreadNum = new Thread[totalThread];
        DataTable dt = new DataTable();
        int j = 1;
        while (true)
        {
            if (_stopper.WaitOne(0, false))
            {
                lov.LogWriter("MainProcess stopper. |" + dt.Rows[0]["que_batch"].ToString());
                string var = sp_Queue_upd(dt.Rows[0]["que_batch"].ToString(), "Q");
                lov.LogWriter("MainProcess stopper sp_Queue_upd " + var + " stopper call, current number is " + que_BatchNumberIndex + " Batch No. :" + que_BatchNumber + ": mainthread End of While");//+ (string)argArray.GetValue(1));

                break;
            }
            if (j > 30)
            {
                _stopper.Set();
                lov.LogWriter("MainProcess stopper.Set");
            }
            if (threadCount.ThreadCount < totalThread && threadCount.ThreadCount >= 0)
            {
                if (ThreadNum[threadCount.ThreadCount] == null)
                {
                    dt = sp_get_entry_spool();
                    if (dt.Rows.Count <= 0)
                    {
                        Thread.Sleep(1000);
                        continue;
                    }
                    object[] objArr = { dt, threadCount.ThreadCount.ToString(), dt.Rows[0]["que_batch"].ToString() };
                    ThreadNum[threadCount.ThreadCount] = new Thread(new ParameterizedThreadStart(Process));
                    ThreadNum[threadCount.ThreadCount].Name = dt.Rows[0]["que_batch"].ToString();
                    ThreadNum[threadCount.ThreadCount].IsBackground = true;
                    ThreadNum[threadCount.ThreadCount].SetApartmentState(ApartmentState.MTA);
                    ThreadNum[threadCount.ThreadCount].Start(objArr);
                    lov.LogWriter("thread Start: Batch No. :" + dt.Rows[0]["que_batch"].ToString() + "|Actual Batch :" + j + "|ThreadCount" + threadCount.ThreadCount.ToString());

                }
                else if (!ThreadNum[threadCount.ThreadCount].IsAlive)
                {
                    dt = sp_get_entry_spool();
                    if (dt.Rows.Count <= 0)
                    {
                        Thread.Sleep(1000);
                        continue;
                    }
                    object[] objArr = new object[3] { dt, threadCount.ThreadCount.ToString(), dt.Rows[0]["que_batch"].ToString() };
                    ThreadNum[threadCount.ThreadCount] = new Thread(new ParameterizedThreadStart(Process));
                    ThreadNum[threadCount.ThreadCount].Name = dt.Rows[0]["que_batch"].ToString();
                    ThreadNum[threadCount.ThreadCount].IsBackground = true;
                    ThreadNum[threadCount.ThreadCount].SetApartmentState(ApartmentState.MTA);
                    ThreadNum[threadCount.ThreadCount].Start(objArr);
                    lov.LogWriter("thread Start: Batch No. :" + dt.Rows[0]["que_batch"].ToString() + "|Actual Batch :" + j + "|ThreadCount" + threadCount.ThreadCount.ToString());
                    j++;
                }
                threadCount.ThreadCount++;

            }
            else if (threadCount.ThreadCount >= totalThread)
            {
                threadCount.ThreadCount = 0;
            }
        }
        if (statusQueue)
        {
            string var = sp_Queue_upd(que_BatchNumber, "Q");
            lov.LogWriter("Back to MainProcess" + var + " stopper call, current number is " + que_BatchNumberIndex + " Batch No. :" + que_BatchNumber + ": mainthread End of While" );//+ (string)argArray.GetValue(1)); 
        }
    }
    public void Process(object ob)
    {
        Array argArray = new object[3];
        argArray = (Array)ob;
        DataTable Data = (DataTable)argArray.GetValue(0);
        string var = "";
        try
        {
            Core_Incor objCore = null;
            int i = 0;
            for (i = 0; i < Data.Rows.Count; i++)
            {
                #region Loop
                var = "";
                try
                {
                    #region Loop
                    if (_stopper.WaitOne(0, false))
                    {   var = sp_Queue_upd(Data.Rows[i]["que_batch"].ToString(), "Q");
                        lov.LogWriter("sp_Queue_upd " + var + " stopper call, current number is " + i + " Batch No. :" + Data.Rows[i]["que_batch"].ToString() + ":" + (string)argArray.GetValue(1));

                        break;
                    }

                    if (Data.Rows[i]["mesg_type"].ToString() == "CQ")
                    { // mark status to "C" }
                    else if (Data.Rows[i]["mesg_type"].ToString() == "IN")
                    { // mark status to "C" }
                    else if (Data.Rows[i]["mesg_type"].ToString() == "CH")
                    { // mark status to "C" }
                    else if (Data.Rows[i]["mesg_type"].ToString() == "IF")
                    { // mark status to "C" }
                    else if (Data.Rows[i]["mesg_type"].ToString() == "IB")
                    { // mark status to "C" }
                    else if (Data.Rows[i]["mesg_type"].ToString() == "IB")
                    { // mark status to "C" }
                    else if (Data.Rows[i]["mesg_type"].ToString() == "BA")
                    { // mark status to "C" } 
                     else if (Data.Rows[i]["mesg_type"].ToString() == "RT")
                    { // mark status to "C" }
                    else if (Data.Rows[i]["mesg_type"].ToString() == "CA" || Data.Rows[i]["mesg_type"].ToString() == "UB")
                    { // mark status to "C" }

                }
                catch (Exception ex)
                {
                    InsertReponce("1", ex.Message, Data.Rows[i]["rowid"].ToString());
                }
                #endregion
            }
        }
        catch (Exception ex)
        {
            var = "1;MainException:FE Process :" + ex.Message;
            lov.LogWriter(var);
        }
    }
...