Как мне создать динамические задачи / потоки для запуска той же функции / метода, в которой задачи / потоки управляются из базы данных? - PullRequest
0 голосов
/ 24 сентября 2018

Я создаю сервис, который будет вызывать задачи для выполнения.Это таблица управляемая и динамичная.Таким образом, на каждом тике службы (каждые 10 секунд) он вызывает таблицу SQL и выясняет, что активно.Если он активен, он создаст новую задачу и поместит ее в список объектов.Каждая задача вызывает одну и ту же функцию, но я передаю уникальный datarow (который содержит уникальный идентификатор из таблицы данных) в параметры передачи для метода.

Когда задачи выполняются одновременно, я получаю случайные ошибки (от вызовов базы данных), которые не имеют отношения к задаче, которая должна выполняться.

Итак, вопрос в том,Возможно ли, что задачи, выполняющие один и тот же метод / функцию в одно и то же время, будут сталкиваться друг с другом и вызывать пересечение потоков?

Вот мой код:

namespace ReportService
{
public partial class Service1 : ServiceBase
{
    //public timer
    private static System.Timers.Timer timerReports = null;

    //collection of tasks
    //private static BlockingCollection<ReportTasks> tasksCollection = new BlockingCollection<ReportTasks>();
    private static List<ReportTasks> tasksCollection = new List<ReportTasks>();

    #region Service functions

    public Service1()
    {
        InitializeComponent();
    }


    protected override void OnStart(string[] args)
    {
        try
        {
            ExceptionInfo exceptioninfo = new ExceptionInfo();
            exceptioninfo.LogType = "L";
            exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
            exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
            exceptioninfo.OptionalMessage = "Service On";

            Log.WriteToLog(exceptioninfo, ConfigSettings.GetServiceSettings());

            timerReports = new System.Timers.Timer();
            timerReports.Interval = 30000;  //30 secconds
            timerReports.Elapsed += new ElapsedEventHandler(this.timerReports_Tick);
            timerReports.Enabled = true;
        }
        catch (Exception ex)
        {
            //Get Service settings
            dynamic ServiceSettings = ConfigSettings.GetServiceSettings();

            if (ex.Data["WrittenToLog"] == null)
            {
                //set new object
                ex.Data.Add("WrittenToLog", true);

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }
            else if ((bool)ex.Data["WrittenToLog"] == false)
            {
                //set written to log as true
                ex.Data["WrittenToLog"] = true;

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }

            //throw exception to back out of process
            //throw ex;
        }
    }

    protected override void OnStop()
    {
        try
        {

            timerReports.Enabled = false;

            ExceptionInfo exceptioninfo = new ExceptionInfo();
            exceptioninfo.LogType = "L";
            exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
            exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
            exceptioninfo.OptionalMessage = "Service Off";

            Log.WriteToLog(exceptioninfo, ConfigSettings.GetServiceSettings());

        }
        catch (Exception ex)
        {
            //Get Service settings
            dynamic ServiceSettings = ConfigSettings.GetServiceSettings();

            if (ex.Data["WrittenToLog"] == null)
            {
                //set new object
                ex.Data.Add("WrittenToLog", true);

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }
            else if ((bool)ex.Data["WrittenToLog"] == false)
            {
                //set written to log as true
                ex.Data["WrittenToLog"] = true;

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }

            //throw exception to back out of process
            //throw ex;
        }
    }

    #endregion

    private void timerReports_Tick(object sender, ElapsedEventArgs e)
    {
        //check to see if task is running, if not, process EDI
        try
        {
            //remove completed tasks               
            tasksCollection.RemoveAll(item => item.ReportTask.Status == TaskStatus.RanToCompletion);


            //Get Service settings
            dynamic ServiceSettings = ConfigSettings.GetServiceSettings();

            //set the SQL command and parameters
            SQLcommand Sqlcommandobj = new SQLcommand();
            Sqlcommandobj.SQLcmd = @"SELECT *,'TABLE' AS [TABLE_NAME] 
                                     FROM EX_TABLE";
            Sqlcommandobj.SQLcmdType = CommandType.Text;

            //fill in list 
            DataSet dsReportSchedules = Queries.ServiceSQLExecute(ServiceSettings, Sqlcommandobj);

            //loop through each schedule to add/remove tasks
            foreach (DataRow drReport in dsReportSchedules.Tables["SCHEDULES"].Rows)
            {
                if (!tasksCollection.Any(item => item.ReportID == Helper.GetValueFromDataRowInt32(drReport, "REPORTS_SCHEDULE_ID")))
                {
                    if (Helper.GetValueFromDataRowString(drReport, "ACTIVE") == "1" && Helper.GetValueFromDataRowString(drReport, "DELETE_DATE") == string.Empty)
                    {
                        //create cancellation for task
                        var ts = new CancellationTokenSource();

                        //create new task
                        Task newTask = new Task(() => ReportProcess.BeginProcessingReport(drReport, ServiceSettings), ts.Token);

                        //fill in report tasks object
                        ReportTasks ReportTasks = new ReportTasks();
                        ReportTasks.ReportID = Helper.GetValueFromDataRowInt32(drReport, "REPORTS_SCHEDULE_ID");
                        ReportTasks.ReportName = Helper.GetValueFromDataRowString(drReport, "NAME");
                        ReportTasks.ReportTask = newTask;
                        ReportTasks.TaskID = newTask.Id;
                        ReportTasks.Active = Convert.ToBoolean(drReport["ACTIVE"]);
                        ReportTasks.CancelTokenSource = ts;

                        //add to task collection
                        tasksCollection.Add(ReportTasks);
                    }
                }
                else
                {
                    //remove if not active or deleted
                    if (Helper.GetValueFromDataRowString(drReport, "ACTIVE") != "1" || Helper.GetValueFromDataRowString(drReport, "DELETE_DATE") != string.Empty)
                    {
                        var itemToRemove = tasksCollection.SingleOrDefault(item => item.ReportID == Helper.GetValueFromDataRowInt32(drReport, "REPORTS_SCHEDULE_ID"));
                        if (itemToRemove.ReportID > 0)
                        {
                            //check to see if task is running
                            if (itemToRemove.ReportTask.Status == TaskStatus.Running)
                            {
                                itemToRemove.CancelTokenSource.Cancel();
                            }

                            //remove task from collection
                            tasksCollection.Remove(itemToRemove);
                        }
                    }
                }
            }

            //trigger each task
            foreach (var str in tasksCollection)
            {
                Console.WriteLine("Task: " + str.ReportName + " - Status: " + str.ReportTask.Status);
                if (str.ReportTask.Status == TaskStatus.RanToCompletion | str.ReportTask.Status == TaskStatus.Created)
                {
                    str.ReportTask.Start();
                }
            }

        }
        catch (Exception ex)
        {
            //Get Service settings
            dynamic ServiceSettings = ConfigSettings.GetServiceSettings();

            if (ex.Data["WrittenToLog"] == null)
            {
                //set new object
                ex.Data.Add("WrittenToLog", true);

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }
            else if ((bool)ex.Data["WrittenToLog"] == false)
            {
                //set written to log as true
                ex.Data["WrittenToLog"] = true;

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }

            //throw exception to back out of process
            //throw ex;
        }

    }

}
}

Inвыше, во время отметки таймера, я просматриваю каждую запись в списке задач и удаляю выполненные задачи.Затем я беру записи из БД и определяю, нужно ли их добавлять.Он также проверит, помечен ли пользователь как удаленный или неактивный, и будет использовать токен отмены, чтобы остановить его, а затем удалить его из списка.Наконец, он будет проходить по циклу и запускать задачи в списке.

Это процесс, который вызывается для каждой задачи:

public static void BeginProcessingReport(Object drReportSchedule, dynamic 
ServiceSettings)
{
}

, и вот объект, который содержит задачи дляСписок <>

public struct ReportTasks
{
    public string ReportName;
    public Int32 ReportID;
    public int TaskID;
    public Task ReportTask;
    public bool Active;
    public CancellationTokenSource CancelTokenSource;
    public ReportTasks(string name, Int32 reportID, int id, Task task, DataRow drReport, bool active, CancellationTokenSource canceltokensource, CancellationToken canceltoken)
    {
        ReportName = name;
        ReportID = reportID;
        TaskID = id;
        ReportTask = task;
        Active = active;
        CancelTokenSource = canceltokensource;
    }
}

Надеюсь, этого достаточно, и любая помощь будет принята с благодарностью

ОБНОВЛЕНИЕ: Мне удалось решить проблемы, создав потокдля основной обработки задач.Удалили функциональность из метода тиков и поместили его в свой собственный метод, который будет вызываться потоком.

Таким образом, каждый тик из таймера будет проверять поток, чтобы увидеть, работает он или нет.

private static Thread mainThread = null;  

    private void timerReports_Tick(object sender, ElapsedEventArgs e)
    {
        //create thread if null
        if (mainThread == null)
        {
            mainThread = new Thread(new ThreadStart(Process));
        }

        //start thread if stopped or unstarted, else, it is still running and do nothing
        if(mainThread.ThreadState == System.Threading.ThreadState.Stopped || mainThread.ThreadState == System.Threading.ThreadState.Unstarted)
        {
            mainThread.Start();
        }
    }

ОБНОВЛЕНИЕ:

Приведенный выше код не работал, поскольку у меня все еще были проблемы с повторным вызовом задач, когда они не были выполнены.

Итак, позвольте мне перефразировать мой вопрос:Как мне создать динамические задачи / потоки для запуска той же функции / метода, в которой задачи / потоки управляются из базы данных?Требуется возможность удалять потоки и задачи, если записи удаляются или имеют значение Active = 0, которое является битовым полем в SQL, чтобы определить, должна ли запись сработать или нет.

1 Ответ

0 голосов
/ 24 сентября 2018

Под "наступлением друг на друга" вы имеете в виду, что тиковое событие происходит снова, пока предыдущее событие все еще обрабатывается?

В этом случае просто отключите таймер в начале тикового события и снова-ключается при завершении обработки

...