Как создать рабочий поток, который запускается по сигналу, а также отдыхает - PullRequest
0 голосов
/ 12 февраля 2019

Я хочу создать рабочий поток, который запускается по сигналу (задача была добавлена ​​в список общих задач) и по окончании будет остановлен.

Требования:

  1. Другие системные потокиможет добавить задачу в любое время
  2. рабочий поток должен отдыхать, если ему нечего делать
  3. если добавлено больше задач, когда рабочий поток активен, он также должен их завершить.
  4. рабочий поток может отдыхать часами -> работа поступает как капли дождя (иногда у нас буря)

После того, как поток добавляет новое задание в рабочий список (общий список), он сигнализирует (AutoRestEvent.Set ()) рабочий поток для начала работы.

У меня есть условие гонки между функциями Set () и WaitOne ().

    public static void AddWork(object obj)
    {
        Monitor.Enter(_syncO);
        _workingList.Add(obj);
        _signal.Set();
        Monitor.Exit(_syncO);
    }

    static object _syncO = new object();
    static AutoResetEvent _signal = new AutoResetEvent(false);
    static List<object> _workingList = new List<object>();
    static void DoWork()
    {
        Thread tradeThread1 = new Thread(() =>
        {
            Thread.CurrentThread.IsBackground = true;
            string tradeMessage = string.Empty;

            while (true)
            {
                Monitor.Enter(_syncO);
                var arr = _workingList.ToArray();
                Monitor.Exit(_syncO);

                // race condition when the set happens just before the 
                // thread was locked
                if (arr.Count() == 0)
                    _signal.WaitOne();

                Monitor.Enter(_syncO);
                arr = _workingList.ToArray();
                Monitor.Exit(_syncO);

                int count = 0;
                var deleteList = new List<object>();
                while (true)
                {
                    foreach (var item in arr)
                    {
                        // the value is changing every iteration. 
                       // this is why I need the Sleep at the end
                        bool b = Handle4(item);
                        if (b)
                            deleteList.Add(item);
                    }

                    if (count == 100) 
                        break;

                    Thread.Sleep(100);
                    count++;
                }

                // remove done tasks from _workingList
                RemoveItems(deleteList);                    

                // we can't close, so we re-set footprints(notifications) on our price cable. -> execute on broker tick(the right tick)
                // reHang trade on cable
                foreach (var item in _workingList)
                {
                    // re-use the undeleted tasks
                }

            }
        });

        tradeThread1.Start();
    }

Основано на помощи @John Wu Iпридумать следующее решение: BlockingCollection будет действовать как ворота для рабочего потока.каждая итерация будет копировать новое задание

private static void AddWork(object tap)
    {
        queue.Add(tap);
    }

    private static BlockingCollection<object> queue = new BlockingCollection<object>();

    static void Work()
    {
        Thread tradeThread1 = new Thread(() =>
        {                
            while (true)
            {
                var workingList = new List<object>();
                var deleteList = new List<object>();
                var reEvaluateList = new List<object>();
                while (true)
                {
                    if (workingList.Count() == 0)
                    {
                        // thread will wait until new work arrives -> it will start working again on the first task to come in.
                        workingList.Add(queue.Take());
                    }

                    foreach (var item in workingList)
                    {
                        bool b = Handle4(item);
                        if (b)
                            deleteList.Add(item);
                        else
                            item.ExitCounter++;

                        if (item.ExitCounter == 1000)
                            reEvaluateList.Add(item);
                    }

                    RemoveItems(deleteList, workingList);

                    // we can't close, so we re-set 
                    // we reevaluate tasks that are working for X amount of time and didn't finish
                    foreach (var item in reEvaluateList)
                        ReEvaluate(item);

                    RemoveItems(reEvaluateList, workingList);

                    // wait.. the item change-over-time, so a wait is a type of calculation.
                    Thread.Sleep(100);

                    // we want to avoid locking if we still have task to process                                                
                    if (queue.Count() == 0)
                        continue;

                    // add new work to local list
                    workingList.Add(queue.Take());
                }
            }
        });

        tradeThread1.Start();
    }

Это выглядит немного грязно.Есть идеи, как сделать это лучше?

...