c # Поток потоков с SqlDataReaders и SqlDataAdapters - PullRequest
0 голосов
/ 30 сентября 2011

Мы замечаем, что внутри нашего .Net-приложения мы имеем дело с использованием SqlDataReader.Хотя мы понимаем, что SqlDataReader не является ThreadSafe, он должен масштабироваться.Следующий код является простым примером, демонстрирующим, что мы не можем масштабировать наше приложение, потому что существует конфликт между методами SqlDataReader GetValueМы не связаны процессором, диском или сетью;Просто внутренняя конкуренция на SqlDataReader.Мы можем запустить приложение 10 раз с 1 потоком, и оно масштабируется линейно, но 10 потоков в 1 приложении не масштабируются.Есть мысли о том, как масштабировать чтение из SQL Server в одном приложении c #?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using System.Globalization;

namespace ThreadAndSQLTester
{
    class Host
    {
        /// <summary>
        /// Gets or sets the receive workers.
        /// </summary>
        /// <value>The receive workers.</value>
        internal List<Worker> Workers { get; set; }
        /// <summary>
        /// Gets or sets the receive threads.
        /// </summary>
        /// <value>The receive threads.</value>
        internal List<Thread> Threads { get; set; }

        public int NumberOfThreads { get; set; }
        public int Sleep { get; set; }
        public int MinutesToRun { get; set; }
        public bool IsRunning { get; set; }
        private System.Timers.Timer runTime;

        private object lockVar = new object();

        public Host()
        {
            Init(1, 0, 0);
        }

        public Host(int numberOfThreads, int sleep, int minutesToRun)
        {
            Init(numberOfThreads, sleep, minutesToRun);
        }

        private void Init(int numberOfThreads, int sleep, int minutesToRun)
        {
            this.Workers = new List<Worker>();
            this.Threads = new List<Thread>();

            this.NumberOfThreads = numberOfThreads;
            this.Sleep = sleep;
            this.MinutesToRun = minutesToRun;

            SetUpTimer();
        }

        private void SetUpTimer()
        {
            if (this.MinutesToRun > 0)
            {
                this.runTime = new System.Timers.Timer();
                this.runTime.Interval = TimeSpan.FromMinutes(this.MinutesToRun).TotalMilliseconds;
                this.runTime.Elapsed += new System.Timers.ElapsedEventHandler(runTime_Elapsed);
                this.runTime.Start();
            }
        }

        void runTime_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            this.runTime.Stop();
            this.Stop();
            this.IsRunning = false;
        }

        public void Start()
        {
            this.IsRunning = true;

            Random r = new Random(DateTime.Now.Millisecond);

            for (int i = 0; i < this.NumberOfThreads; i++)
            {
                string threadPoolId = Math.Ceiling(r.NextDouble() * 10).ToString();

                Worker worker = new Worker("-" + threadPoolId); //i.ToString());
                worker.Sleep = this.Sleep;

                this.Workers.Add(worker);

                Thread thread = new Thread(worker.Work);
                worker.Name = string.Format("WorkerThread-{0}", i);

                thread.Name = worker.Name;

                this.Threads.Add(thread);
                thread.Start();

                Debug.WriteLine(string.Format(CultureInfo.InvariantCulture, "Started new Worker Thread. Total active: {0}", i + 1));
            }
        }

        public void Stop()
        {
            if (this.Workers != null)
            {
                lock (lockVar)
                {
                    for (int i = 0; i < this.Workers.Count; i++)
                    {
                        //Thread thread = this.Threads[i];
                        //thread.Interrupt();
                        this.Workers[i].IsEnabled = false;
                    }

                    for (int i = this.Workers.Count - 1; i >= 0; i--)
                    {
                        Worker worker = this.Workers[i];
                        while (worker.IsRunning)
                        {
                            Thread.Sleep(32);
                        }
                    }

                    foreach (Thread thread in this.Threads)
                    {
                        thread.Abort();
                    }

                    this.Workers.Clear();
                    this.Threads.Clear();
                }
            }
        }

    }
}

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SqlClient;
using System.Data;
using System.Threading;
using System.ComponentModel;
using System.Data.OleDb;

namespace ThreadAndSQLTester
{
    class Worker
    {
        public bool IsEnabled { get; set; }
        public bool IsRunning { get; set; }
        public string Name { get; set; }
        public int Sleep { get; set; }

        private string dataCnString { get; set; }
        private string logCnString { get; set; }

        private List<Log> Logs { get; set; }

        public Worker(string threadPoolId)
        {
            this.Logs = new List<Log>();

            SqlConnectionStringBuilder cnBldr = new SqlConnectionStringBuilder();
            cnBldr.DataSource = @"trgcrmqa3";
            cnBldr.InitialCatalog = "Scratch";
            cnBldr.IntegratedSecurity = true;
            cnBldr.MultipleActiveResultSets = true;
            cnBldr.Pooling = true;            

            dataCnString = GetConnectionStringWithWorkStationId(cnBldr.ToString(), threadPoolId);            

            cnBldr = new SqlConnectionStringBuilder();
            cnBldr.DataSource = @"trgcrmqa3";
            cnBldr.InitialCatalog = "Scratch";
            cnBldr.IntegratedSecurity = true;

            logCnString = GetConnectionStringWithWorkStationId(cnBldr.ToString(), string.Empty);

            IsEnabled = true;
        }

        private string machineName { get; set; }
        private string GetConnectionStringWithWorkStationId(string connectionString, string connectionPoolToken)
        {
            if (string.IsNullOrEmpty(machineName)) machineName = Environment.MachineName;

            SqlConnectionStringBuilder cnbdlr;
            try
            {
                cnbdlr = new SqlConnectionStringBuilder(connectionString);
            }
            catch
            {
                throw new ArgumentException("connection string was an invalid format");
            }

            cnbdlr.WorkstationID = machineName + connectionPoolToken;

            return cnbdlr.ConnectionString;
        }

        public void Work()
        {
            int i = 0;

            while (this.IsEnabled)
            {
                this.IsRunning = true;

                try
                {
                    Log log = new Log();
                    log.WorkItemId = Guid.NewGuid();
                    log.StartTime = DateTime.Now;
                    List<object> lst = new List<object>();

                    using (SqlConnection cn = new SqlConnection(this.dataCnString))
                    {
                        try
                        {
                            cn.Open();

                            using (SqlCommand cmd = new SqlCommand("Analysis.spSelectTestData", cn))
                            {
                                cmd.CommandType = System.Data.CommandType.StoredProcedure;

                                using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess)) // DBHelper.ExecuteReader(cn, cmd))
                                {                                    
                                    while (dr.Read())
                                    {
                                        CreateClaimHeader2(dr, lst);
                                    }

                                    dr.Close();
                                }

                                cmd.Cancel();
                            }
                        }
                        catch { }
                        finally
                        {
                            cn.Close();
                        }
                    }

                    log.StopTime = DateTime.Now;
                    log.RouteName = this.Name;
                    log.HostName = this.machineName;

                    this.Logs.Add(log);
                    i++;

                    if (i > 1000)
                    {
                        Console.WriteLine(string.Format("Thread: {0} executed {1} items.", this.Name, i));
                        i = 0;
                    }

                    if (this.Sleep > 0) Thread.Sleep(this.Sleep);
                }
                catch { }
            }

            this.LogMessages();

            this.IsRunning = false;
        }       

        private void CreateClaimHeader2(IDataReader reader, List<object> lst)
        {
            lst.Add(reader["ClaimHeaderID"]);
            lst.Add(reader["ClientCode"]);
            lst.Add(reader["MemberID"]);
            lst.Add(reader["ProviderID"]);
            lst.Add(reader["ClaimNumber"]);
            lst.Add(reader["PatientAcctNumber"]);
            lst.Add(reader["Source"]);
            lst.Add(reader["SourceID"]);
            lst.Add(reader["TotalPayAmount"]);
            lst.Add(reader["TotalBillAmount"]);
            lst.Add(reader["FirstDateOfService"]);
            lst.Add(reader["LastDateOfService"]);
            lst.Add(reader["MaxStartDateOfService"]);
            lst.Add(reader["MaxValidStartDateOfService"]);
            lst.Add(reader["LastUpdated"]);
            lst.Add(reader["UpdatedBy"]);
        }

        /// <summary>
        /// Toes the data table.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="data">The data.</param>
        /// <returns></returns>
        public DataTable ToDataTable<T>(IEnumerable<T> data)
        {
            PropertyDescriptorCollection props =
                TypeDescriptor.GetProperties(typeof(T));

            if (props == null) throw new ArgumentNullException("Table properties.");
            if (data == null) throw new ArgumentNullException("data");

            DataTable table = new DataTable();
            for (int i = 0; i < props.Count; i++)
            {
                PropertyDescriptor prop = props[i];
                table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
            }
            object[] values = new object[props.Count];
            foreach (T item in data)
            {
                for (int i = 0; i < values.Length; i++)
                {
                    values[i] = props[i].GetValue(item) ?? DBNull.Value;
                }
                table.Rows.Add(values);
            }
            return table;
        }


        private void LogMessages()
        {
            using (SqlConnection cn = new SqlConnection(this.logCnString))
            {
                try
                {
                    cn.Open();
                    DataTable dt = ToDataTable(this.Logs);

                    Console.WriteLine(string.Format("Logging {0} records for Thread: {1}", this.Logs.Count, this.Name));

                    using (SqlCommand cmd = new SqlCommand("Analysis.spInsertWorkItemRouteLog", cn))
                    {
                        cmd.CommandType = System.Data.CommandType.StoredProcedure;

                        cmd.Parameters.AddWithValue("@dt", dt);

                        cmd.ExecuteNonQuery();
                    }

                    Console.WriteLine(string.Format("Logged {0} records for Thread: {1}", this.Logs.Count, this.Name));
                }
                finally
                {
                    cn.Close();
                }
            }
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 15 марта 2013
1.A DataReader works in a connected environment,
whereas DataSet works in a disconnected environment.

2.A DataSet represents an in-memory cache of data consisting of any number of inter related  DataTable objects. A DataTable object represents a tabular block of in-memory data.

SqlDataAdapter или sqlDataReader

0 голосов
/ 15 марта 2013

Разница между SqlDataAdapter или sqlDataReader? Ответ: 1. DataReader работает в подключенной среде, тогда как DataSet работает в автономной среде. 2. DataSet представляет собой кэш данных в памяти, состоящий из любого количества взаимосвязанных объектов DataTable. Объект DataTable представляет собой табличный блок данных в памяти.

SqlDataAdapter или sqlDataReader

...