C # Подождите, пока все потоки прервутся в ThreadPool - PullRequest
4 голосов
/ 18 февраля 2010

У меня есть один основной поток и много других фоновых потоков.

Основное использование этих фоновых потоков - это запрос данных (много запросов из Интернета, поэтому я создаю несколько потоков: чтобы избежатьотставание пользовательского интерфейса).

Когда дело доходит до экспорта данных в основной поток (пользовательский интерфейс), мне нужно подождать, пока все остальные потоки не будут завершены.

Мой код:

//...code to open save file dialog...

//this loop is to wait for all the threads finish their query
//QueryThread.threadCount is the count of the background threads
while (QueryThread.threadCount != 0)
{
    Thread.CurrentThread.Join(1000);
    Console.WriteLine(QueryThread.threadCount);
}

//...code to export data...

Если я прокомментирую цикл while, программа будет работать гладко, но некоторые из моих экспортированных данных будут иметь возможность показывать некоторый «нежелательный» материал, так как некоторые фоновые потоки еще не завершены.их работа.

Однако вышеприведенный цикл while бесконечен, threadCount никогда не изменяется, что означает, что при использовании метода Join () никакой фоновый поток не выполняется.

Почему фонтемы заблокированы и как я могу решить проблему?

Большое спасибо!

Ответы [ 5 ]

3 голосов
/ 18 февраля 2010

Вы вызываете метод Join в текущем потоке, который не имеет особого смысла.Вы должны вызывать его в своих рабочих потоках:

foreach (Thread thread in workerThreads)
{
    thread.Join(1000);
}

К сожалению, этот тип отрицает цель использования потоков, потому что он блокирует вызов до тех пор, пока все остальные потоки не будут завершены.

RunWorkerCompleted Событие BackgroundWorker может использоваться для уведомления о завершении фоновой задачи и выполнения обновлений в форме.

2 голосов
/ 18 февраля 2010

Я думаю, вы хотите посмотреть на сигнализацию. Есть сигналы (ManualResetEvent / AutoResetEvent) для ваших потоков. Установите () соответствующий дескриптор сигнала в рабочем потоке, когда это будет сделано. В главном потоке выполните «WaitAll (signal1, signal2, signal3)», чтобы дождаться завершения ваших рабочих потоков.

Надеюсь, это поможет,

2 голосов
/ 18 февраля 2010

Ваша реализация неверна, и вы не должны использовать Join как примитив синхронизации между вашими потоками.

Что вам нужно сделать, это реализовать шаблон производитель-потребитель .Это позволит вам иметь потоки, ожидающие выполнения работы, а затем ожить, чтобы выполнить эту работу, когда вы поместите ее в очередь.

Однако, изменение, которое я бы сделал, это то, что из потока пользовательского интерфейса,не добавляйте данные непосредственно в очередь, которую разделяют производитель и потребитель.Вместо этого сделайте копию этих данных и затем поместите , что в очередь.

Для получения дополнительной информации о том, как реализовать шаблон производитель-потребитель в .NET,Я предлагаю вам прочитать документацию MSDN под названием « Как: синхронизировать источник и поток потребителя (Руководство по программированию в C #) »

1 голос
/ 19 февраля 2010

Я не удержался и попробовал сам. Я уверен, что есть место для улучшений, но я думаю, что это показывает, как решить некоторые проблемы с многопоточностью, включая оригинальный вопрос.

Form.cs


namespace STAFormWithThreadPoolSync
{
    internal delegate void WorkerEvent(WorkerEventInfo info);

    public partial class Form1 : Form
    {
        // We'll create a state object for each worker process
        List<WorkerState> workerStates = new List<WorkerState>();

        public Form1()
        {
            InitializeComponent();
        }

        // Executed in the main thread
        private void button1_Click(object sender, EventArgs e)
        {
            workersList.Items.Clear();

            // Read the amount of thread we should start from the form
            int threadCountToUse = (int)ThreadCount.Value;

            WorkerEvent woEvent = new WorkerEvent(this.workerEventOccured);

            // Start up all threads
            for (int counter = 0; counter < threadCountToUse; ++counter)
            {
                // An object we can pass values into for the worker process to use.
                WorkerState workerState = new WorkerState();

                workerState.OnStarted += woEvent;
                workerState.OnFinished += woEvent;

                // Register for the signal (and store its registered wait handle in the stateObj, which we also pass into the parameters!)
                workerState.registeredWaitHandle = ThreadPool.RegisterWaitForSingleObject(workerState.finishSignal, this.ItemHasFinished, workerState, -1, true); 

                // Store the state object for later use.
                workerStates.Add(workerState);
            }

            WorkersProgress.Minimum = 0;
            WorkersProgress.Maximum = workerStates.Count;

            workerStates.ForEach(workerState =>
                {
                    // Fire of the worker thread (with the state object)
                    ThreadPool.QueueUserWorkItem(this.ProcessItem, workerState);
                }
            );



            button1.Enabled = false;
            CurrentResult.Value = 0;
            CurrentResultLabel.Text = "Current value";
            ProgressTimer.Start();
        }

        // event is run on the callers thread, so carefull accessing our controls on our form.
        internal void workerEventOccured(WorkerEventInfo info)
        {
            if (this.workersList.InvokeRequired)
            {
                WorkerEvent workerEvent = new WorkerEvent(workerEventOccured);
                this.Invoke(workerEvent, new object[] { info });
            }
            else
            {
                switch (info.eventType)
                {
                    case EventType.WorkerStarted:
                        this.workersList.Items.Add(String.Format("Worker started on thread : {0}", info.workerState.threadId));
                        break;

                    case EventType.WorkerEnded:
                        this.workersList.Items.Add(String.Format("Worker finished on thread : {0}", info.workerState.threadId));
                        break;
                    case EventType.AllWorkersFinished:
                        this.workersList.Items.Add("ALL workers finished");
                        ProgressTimer.Stop();
                        button1.Enabled = true;
                        CurrentResultLabel.Text = "Final value";
                        break;
                }
            }
        }

        // Executed in threadpool thread.
        private void ProcessItem(object state)
        {
            WorkerState workerState = state as WorkerState;
            int threadId = Thread.CurrentThread.ManagedThreadId;
            workerState.threadId = threadId.ToString();

            WorkerEventInfo weInfo = new WorkerEventInfo();
            weInfo.eventType = EventType.WorkerStarted;
            weInfo.workerState = workerState;
            workerState.Started(weInfo);

            // Simulate work for ((threadid / 2) seconds.
            Thread.Sleep((threadId * 500));

            // Set the result in the state object to the threadId; 
            workerState.result = threadId;

            // Signal that this thread is done.
            workerState.finishSignal.Set();
        }

        // Executed in threadpool thread
        private void ItemHasFinished(object state, bool timedOut)
        {
            // get our state object
            WorkerState workerState = state as WorkerState;

            WorkerEventInfo weInfo = new WorkerEventInfo();
            weInfo.eventType = EventType.WorkerEnded;
            weInfo.workerState = workerState;

            workerState.Finished(weInfo);
        }

        private void ProgressTimer_Tick(object sender, EventArgs e)
        {
            List<WorkerState> removeStates = new List<WorkerState>();
            workerStates.ForEach(workerState =>
                {
                    if (workerState.finishSignal.WaitOne(0))
                    {
                        CurrentResult.Value += workerState.result;
                        removeStates.Add(workerState);
                    }
                }
            );

            removeStates.ForEach(workerState =>
                {
                    workerState.registeredWaitHandle.Unregister(workerState.finishSignal);
                    workerStates.Remove(workerState);
                }
            );


            WorkersProgress.Value = workerStates.Count;
            if (workerStates.Count == 0)
            {
                WorkerEventInfo weInfo = new WorkerEventInfo();
                weInfo.eventType = EventType.AllWorkersFinished;
                weInfo.workerState = null;
                this.workerEventOccured(weInfo);
            }

        }
    }

    internal class WorkerState
    {
        internal string threadId = "";
        internal int    result = 0;
        internal RegisteredWaitHandle registeredWaitHandle = null;
        internal AutoResetEvent finishSignal = new AutoResetEvent(false);
        internal event WorkerEvent OnStarted = new WorkerEvent( (info) => {});
        internal event WorkerEvent OnFinished = new WorkerEvent((info) => { });

        internal void Started(WorkerEventInfo info)
        {
            OnStarted(info);
        }

        internal void Finished(WorkerEventInfo info)
        {
            OnFinished(info);
            this.finishSignal.Set();
        }
    }

    internal enum EventType
    {
        WorkerStarted,
        WorkerEnded,
        AllWorkersFinished
    }

    internal class WorkerEventInfo
    {
        internal EventType eventType;
        internal WorkerState workerState;
    }
}


Form.Designer.cs


namespace STAFormWithThreadPoolSync
{
    partial class Form1
    {
        /// 
        /// Required designer variable.
        /// 
        private System.ComponentModel.IContainer components = null;

        /// 
        /// Clean up any resources being used.
        /// 
        /// true if managed resources should be disposed; otherwise, false.
        protected override void Dispose(bool disposing)
        {
            if (disposing && (components != null))
            {
                components.Dispose();
            }
            base.Dispose(disposing);
        }

        #region Windows Form Designer generated code

        /// 
        /// Required method for Designer support - do not modify
        /// the contents of this method with the code editor.
        /// 
        private void InitializeComponent()
        {
            this.components = new System.ComponentModel.Container();
            this.button1 = new System.Windows.Forms.Button();
            this.ThreadCount = new System.Windows.Forms.NumericUpDown();
            this.workersList = new System.Windows.Forms.ListView();
            this.WorkerProcessColumn = new System.Windows.Forms.ColumnHeader();
            this.ProgressTimer = new System.Windows.Forms.Timer(this.components);
            this.WorkersProgress = new System.Windows.Forms.ProgressBar();
            this.CurrentResultLabel = new System.Windows.Forms.Label();
            this.CurrentResult = new System.Windows.Forms.NumericUpDown();
            this.label2 = new System.Windows.Forms.Label();
            ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).BeginInit();
            ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).BeginInit();
            this.SuspendLayout();
            // 
            // button1
            // 
            this.button1.Location = new System.Drawing.Point(212, 19);
            this.button1.Name = "button1";
            this.button1.Size = new System.Drawing.Size(93, 23);
            this.button1.TabIndex = 0;
            this.button1.Text = "Start threads";
            this.button1.UseVisualStyleBackColor = true;
            this.button1.Click += new System.EventHandler(this.button1_Click);
            // 
            // ThreadCount
            // 
            this.ThreadCount.Location = new System.Drawing.Point(23, 21);
            this.ThreadCount.Minimum = new decimal(new int[] {
            2,
            0,
            0,
            0});
            this.ThreadCount.Name = "ThreadCount";
            this.ThreadCount.Size = new System.Drawing.Size(183, 20);
            this.ThreadCount.TabIndex = 1;
            this.ThreadCount.Value = new decimal(new int[] {
            4,
            0,
            0,
            0});
            // 
            // workersList
            // 
            this.workersList.Columns.AddRange(new System.Windows.Forms.ColumnHeader[] {
            this.WorkerProcessColumn});
            this.workersList.Location = new System.Drawing.Point(23, 80);
            this.workersList.Name = "workersList";
            this.workersList.Size = new System.Drawing.Size(486, 255);
            this.workersList.TabIndex = 3;
            this.workersList.UseCompatibleStateImageBehavior = false;
            this.workersList.View = System.Windows.Forms.View.Details;
            // 
            // WorkerProcessColumn
            // 
            this.WorkerProcessColumn.Text = "Worker process";
            this.WorkerProcessColumn.Width = 482;
            // 
            // ProgressTimer
            // 
            this.ProgressTimer.Interval = 200;
            this.ProgressTimer.Tick += new System.EventHandler(this.ProgressTimer_Tick);
            // 
            // WorkersProgress
            // 
            this.WorkersProgress.Location = new System.Drawing.Point(112, 341);
            this.WorkersProgress.Name = "WorkersProgress";
            this.WorkersProgress.Size = new System.Drawing.Size(397, 24);
            this.WorkersProgress.TabIndex = 4;
            // 
            // CurrentResultLabel
            // 
            this.CurrentResultLabel.AutoSize = true;
            this.CurrentResultLabel.Location = new System.Drawing.Point(578, 266);
            this.CurrentResultLabel.Name = "CurrentResultLabel";
            this.CurrentResultLabel.Size = new System.Drawing.Size(74, 13);
            this.CurrentResultLabel.TabIndex = 5;
            this.CurrentResultLabel.Text = "Current Result";
            // 
            // CurrentResult
            // 
            this.CurrentResult.Location = new System.Drawing.Point(581, 282);
            this.CurrentResult.Maximum = new decimal(new int[] {
            -1593835520,
            466537709,
            54210,
            0});
            this.CurrentResult.Name = "CurrentResult";
            this.CurrentResult.ReadOnly = true;
            this.CurrentResult.Size = new System.Drawing.Size(169, 20);
            this.CurrentResult.TabIndex = 6;
            // 
            // label2
            // 
            this.label2.AutoSize = true;
            this.label2.Location = new System.Drawing.Point(25, 352);
            this.label2.Name = "label2";
            this.label2.Size = new System.Drawing.Size(81, 13);
            this.label2.TabIndex = 7;
            this.label2.Text = "processing load";
            // 
            // Form1
            // 
            this.AutoScaleDimensions = new System.Drawing.SizeF(6F, 13F);
            this.AutoScaleMode = System.Windows.Forms.AutoScaleMode.Font;
            this.ClientSize = new System.Drawing.Size(762, 377);
            this.Controls.Add(this.label2);
            this.Controls.Add(this.CurrentResult);
            this.Controls.Add(this.CurrentResultLabel);
            this.Controls.Add(this.WorkersProgress);
            this.Controls.Add(this.workersList);
            this.Controls.Add(this.ThreadCount);
            this.Controls.Add(this.button1);
            this.Name = "Form1";
            this.Text = "Form1";
            ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).EndInit();
            ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).EndInit();
            this.ResumeLayout(false);
            this.PerformLayout();

        }

        #endregion

        private System.Windows.Forms.Button button1;
        private System.Windows.Forms.NumericUpDown ThreadCount;
        private System.Windows.Forms.ListView workersList;
        private System.Windows.Forms.ColumnHeader WorkerProcessColumn;
        private System.Windows.Forms.Timer ProgressTimer;
        private System.Windows.Forms.ProgressBar WorkersProgress;
        private System.Windows.Forms.Label CurrentResultLabel;
        private System.Windows.Forms.NumericUpDown CurrentResult;
        private System.Windows.Forms.Label label2;
    }
}

Надеюсь, это поможет,

0 голосов
/ 19 февраля 2010

Я решил проблему, изменив подход к модели «производитель-потребитель».

Спасибо всем.Пожалуйста, посмотрите на эту ссылку (предоставленную casperOne выше), но будьте осторожны, не следите за реализацией Microsoft ....

Перейти здесь вместо этого дастВы лучше ответите.

Конечно, я внес некоторые изменения, тип очереди в моем случае - «Делегат».

public static class QueryThread
{
    private static SyncEvents _syncEvents = new SyncEvents();
    private static Queue<Delegate> _queryQueue = new Queue<Delegate>();

    static Producer queryProducer;
    static Consumer queryConsumer;

    public static void init()
    {
        queryProducer = new Producer(_queryQueue, _syncEvents);
        queryConsumer = new Consumer(_queryQueue, _syncEvents);

        Thread producerThread = new Thread(queryProducer.ThreadRun);
        Thread consumerThread = new Thread(queryConsumer.ThreadRun);

        producerThread.IsBackground = true;
        consumerThread.IsBackground = true;

        producerThread.Start();
        consumerThread.Start();
    }

    public static void Enqueue(Delegate item)
    {
        queryQueue.Enqueue(item);
    }
}

Когда в основном потоке требуется запрос,ставить делегат в очередь на функцию, которая делает запрос, вызывая Enqueue (элемент Delegate).Это добавит делегата в «частную» очередь источника.

Производитель добавит элементы в своей собственной очереди в общую очередь в подходящее время (например, генерирует случайное число и помещает его в общую очередь).в примере msdn).

Потребитель исключает делегатов из очереди и запускает их.

Спасибо всем за помощь.=]

...