Я использую 10 потоков, каждый поток получает 10 записей с нулевым статусом, а затем
обновить статус до "Q". Теперь эти записи запрашиваются разными API, и после успешного ответа я обновляю статус до «C», все идет хорошо. В качестве ограничителя для остановки всех потоков я использовал ManualResetEvent после того, как (j> 30) ManualResetEven.Set () я проверил условие
if (_stopper.WaitOne(0, false))
{
break;
}
чтобы остановить цикл while и обновить состояние до нуля для оставшихся записей каждого потока, чей статус «Q». При остановке я обновляю статус с «Q» до нуля во всех записях 10 потоков, проблема в том, что один записи моего потока не обновляет статус до нуля от "Q"?
Вот мой код:
public void MainProcess()
{
int totalThread = 10;
Thread[] ThreadNum = new Thread[totalThread];
DataTable dt = new DataTable();
int j = 1;
while (true)
{
if (_stopper.WaitOne(0, false))
{
lov.LogWriter("MainProcess stopper. |" + dt.Rows[0]["que_batch"].ToString());
string var = sp_Queue_upd(dt.Rows[0]["que_batch"].ToString(), "Q");
lov.LogWriter("MainProcess stopper sp_Queue_upd " + var + " stopper call, current number is " + que_BatchNumberIndex + " Batch No. :" + que_BatchNumber + ": mainthread End of While");//+ (string)argArray.GetValue(1));
break;
}
if (j > 30)
{
_stopper.Set();
lov.LogWriter("MainProcess stopper.Set");
}
if (threadCount.ThreadCount < totalThread && threadCount.ThreadCount >= 0)
{
if (ThreadNum[threadCount.ThreadCount] == null)
{
dt = sp_get_entry_spool();
if (dt.Rows.Count <= 0)
{
Thread.Sleep(1000);
continue;
}
object[] objArr = { dt, threadCount.ThreadCount.ToString(), dt.Rows[0]["que_batch"].ToString() };
ThreadNum[threadCount.ThreadCount] = new Thread(new ParameterizedThreadStart(Process));
ThreadNum[threadCount.ThreadCount].Name = dt.Rows[0]["que_batch"].ToString();
ThreadNum[threadCount.ThreadCount].IsBackground = true;
ThreadNum[threadCount.ThreadCount].SetApartmentState(ApartmentState.MTA);
ThreadNum[threadCount.ThreadCount].Start(objArr);
lov.LogWriter("thread Start: Batch No. :" + dt.Rows[0]["que_batch"].ToString() + "|Actual Batch :" + j + "|ThreadCount" + threadCount.ThreadCount.ToString());
}
else if (!ThreadNum[threadCount.ThreadCount].IsAlive)
{
dt = sp_get_entry_spool();
if (dt.Rows.Count <= 0)
{
Thread.Sleep(1000);
continue;
}
object[] objArr = new object[3] { dt, threadCount.ThreadCount.ToString(), dt.Rows[0]["que_batch"].ToString() };
ThreadNum[threadCount.ThreadCount] = new Thread(new ParameterizedThreadStart(Process));
ThreadNum[threadCount.ThreadCount].Name = dt.Rows[0]["que_batch"].ToString();
ThreadNum[threadCount.ThreadCount].IsBackground = true;
ThreadNum[threadCount.ThreadCount].SetApartmentState(ApartmentState.MTA);
ThreadNum[threadCount.ThreadCount].Start(objArr);
lov.LogWriter("thread Start: Batch No. :" + dt.Rows[0]["que_batch"].ToString() + "|Actual Batch :" + j + "|ThreadCount" + threadCount.ThreadCount.ToString());
j++;
}
threadCount.ThreadCount++;
}
else if (threadCount.ThreadCount >= totalThread)
{
threadCount.ThreadCount = 0;
}
}
if (statusQueue)
{
string var = sp_Queue_upd(que_BatchNumber, "Q");
lov.LogWriter("Back to MainProcess" + var + " stopper call, current number is " + que_BatchNumberIndex + " Batch No. :" + que_BatchNumber + ": mainthread End of While" );//+ (string)argArray.GetValue(1));
}
}
public void Process(object ob)
{
Array argArray = new object[3];
argArray = (Array)ob;
DataTable Data = (DataTable)argArray.GetValue(0);
string var = "";
try
{
Core_Incor objCore = null;
int i = 0;
for (i = 0; i < Data.Rows.Count; i++)
{
#region Loop
var = "";
try
{
#region Loop
if (_stopper.WaitOne(0, false))
{ var = sp_Queue_upd(Data.Rows[i]["que_batch"].ToString(), "Q");
lov.LogWriter("sp_Queue_upd " + var + " stopper call, current number is " + i + " Batch No. :" + Data.Rows[i]["que_batch"].ToString() + ":" + (string)argArray.GetValue(1));
break;
}
if (Data.Rows[i]["mesg_type"].ToString() == "CQ")
{ // mark status to "C" }
else if (Data.Rows[i]["mesg_type"].ToString() == "IN")
{ // mark status to "C" }
else if (Data.Rows[i]["mesg_type"].ToString() == "CH")
{ // mark status to "C" }
else if (Data.Rows[i]["mesg_type"].ToString() == "IF")
{ // mark status to "C" }
else if (Data.Rows[i]["mesg_type"].ToString() == "IB")
{ // mark status to "C" }
else if (Data.Rows[i]["mesg_type"].ToString() == "IB")
{ // mark status to "C" }
else if (Data.Rows[i]["mesg_type"].ToString() == "BA")
{ // mark status to "C" }
else if (Data.Rows[i]["mesg_type"].ToString() == "RT")
{ // mark status to "C" }
else if (Data.Rows[i]["mesg_type"].ToString() == "CA" || Data.Rows[i]["mesg_type"].ToString() == "UB")
{ // mark status to "C" }
}
catch (Exception ex)
{
InsertReponce("1", ex.Message, Data.Rows[i]["rowid"].ToString());
}
#endregion
}
}
catch (Exception ex)
{
var = "1;MainException:FE Process :" + ex.Message;
lov.LogWriter(var);
}
}