Мы замечаем, что внутри нашего .Net-приложения мы имеем дело с использованием SqlDataReader.Хотя мы понимаем, что SqlDataReader не является ThreadSafe, он должен масштабироваться.Следующий код является простым примером, демонстрирующим, что мы не можем масштабировать наше приложение, потому что существует конфликт между методами SqlDataReader GetValueМы не связаны процессором, диском или сетью;Просто внутренняя конкуренция на SqlDataReader.Мы можем запустить приложение 10 раз с 1 потоком, и оно масштабируется линейно, но 10 потоков в 1 приложении не масштабируются.Есть мысли о том, как масштабировать чтение из SQL Server в одном приложении c #?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using System.Globalization;
namespace ThreadAndSQLTester
{
class Host
{
/// <summary>
/// Gets or sets the receive workers.
/// </summary>
/// <value>The receive workers.</value>
internal List<Worker> Workers { get; set; }
/// <summary>
/// Gets or sets the receive threads.
/// </summary>
/// <value>The receive threads.</value>
internal List<Thread> Threads { get; set; }
public int NumberOfThreads { get; set; }
public int Sleep { get; set; }
public int MinutesToRun { get; set; }
public bool IsRunning { get; set; }
private System.Timers.Timer runTime;
private object lockVar = new object();
public Host()
{
Init(1, 0, 0);
}
public Host(int numberOfThreads, int sleep, int minutesToRun)
{
Init(numberOfThreads, sleep, minutesToRun);
}
private void Init(int numberOfThreads, int sleep, int minutesToRun)
{
this.Workers = new List<Worker>();
this.Threads = new List<Thread>();
this.NumberOfThreads = numberOfThreads;
this.Sleep = sleep;
this.MinutesToRun = minutesToRun;
SetUpTimer();
}
private void SetUpTimer()
{
if (this.MinutesToRun > 0)
{
this.runTime = new System.Timers.Timer();
this.runTime.Interval = TimeSpan.FromMinutes(this.MinutesToRun).TotalMilliseconds;
this.runTime.Elapsed += new System.Timers.ElapsedEventHandler(runTime_Elapsed);
this.runTime.Start();
}
}
void runTime_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
this.runTime.Stop();
this.Stop();
this.IsRunning = false;
}
public void Start()
{
this.IsRunning = true;
Random r = new Random(DateTime.Now.Millisecond);
for (int i = 0; i < this.NumberOfThreads; i++)
{
string threadPoolId = Math.Ceiling(r.NextDouble() * 10).ToString();
Worker worker = new Worker("-" + threadPoolId); //i.ToString());
worker.Sleep = this.Sleep;
this.Workers.Add(worker);
Thread thread = new Thread(worker.Work);
worker.Name = string.Format("WorkerThread-{0}", i);
thread.Name = worker.Name;
this.Threads.Add(thread);
thread.Start();
Debug.WriteLine(string.Format(CultureInfo.InvariantCulture, "Started new Worker Thread. Total active: {0}", i + 1));
}
}
public void Stop()
{
if (this.Workers != null)
{
lock (lockVar)
{
for (int i = 0; i < this.Workers.Count; i++)
{
//Thread thread = this.Threads[i];
//thread.Interrupt();
this.Workers[i].IsEnabled = false;
}
for (int i = this.Workers.Count - 1; i >= 0; i--)
{
Worker worker = this.Workers[i];
while (worker.IsRunning)
{
Thread.Sleep(32);
}
}
foreach (Thread thread in this.Threads)
{
thread.Abort();
}
this.Workers.Clear();
this.Threads.Clear();
}
}
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SqlClient;
using System.Data;
using System.Threading;
using System.ComponentModel;
using System.Data.OleDb;
namespace ThreadAndSQLTester
{
class Worker
{
public bool IsEnabled { get; set; }
public bool IsRunning { get; set; }
public string Name { get; set; }
public int Sleep { get; set; }
private string dataCnString { get; set; }
private string logCnString { get; set; }
private List<Log> Logs { get; set; }
public Worker(string threadPoolId)
{
this.Logs = new List<Log>();
SqlConnectionStringBuilder cnBldr = new SqlConnectionStringBuilder();
cnBldr.DataSource = @"trgcrmqa3";
cnBldr.InitialCatalog = "Scratch";
cnBldr.IntegratedSecurity = true;
cnBldr.MultipleActiveResultSets = true;
cnBldr.Pooling = true;
dataCnString = GetConnectionStringWithWorkStationId(cnBldr.ToString(), threadPoolId);
cnBldr = new SqlConnectionStringBuilder();
cnBldr.DataSource = @"trgcrmqa3";
cnBldr.InitialCatalog = "Scratch";
cnBldr.IntegratedSecurity = true;
logCnString = GetConnectionStringWithWorkStationId(cnBldr.ToString(), string.Empty);
IsEnabled = true;
}
private string machineName { get; set; }
private string GetConnectionStringWithWorkStationId(string connectionString, string connectionPoolToken)
{
if (string.IsNullOrEmpty(machineName)) machineName = Environment.MachineName;
SqlConnectionStringBuilder cnbdlr;
try
{
cnbdlr = new SqlConnectionStringBuilder(connectionString);
}
catch
{
throw new ArgumentException("connection string was an invalid format");
}
cnbdlr.WorkstationID = machineName + connectionPoolToken;
return cnbdlr.ConnectionString;
}
public void Work()
{
int i = 0;
while (this.IsEnabled)
{
this.IsRunning = true;
try
{
Log log = new Log();
log.WorkItemId = Guid.NewGuid();
log.StartTime = DateTime.Now;
List<object> lst = new List<object>();
using (SqlConnection cn = new SqlConnection(this.dataCnString))
{
try
{
cn.Open();
using (SqlCommand cmd = new SqlCommand("Analysis.spSelectTestData", cn))
{
cmd.CommandType = System.Data.CommandType.StoredProcedure;
using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess)) // DBHelper.ExecuteReader(cn, cmd))
{
while (dr.Read())
{
CreateClaimHeader2(dr, lst);
}
dr.Close();
}
cmd.Cancel();
}
}
catch { }
finally
{
cn.Close();
}
}
log.StopTime = DateTime.Now;
log.RouteName = this.Name;
log.HostName = this.machineName;
this.Logs.Add(log);
i++;
if (i > 1000)
{
Console.WriteLine(string.Format("Thread: {0} executed {1} items.", this.Name, i));
i = 0;
}
if (this.Sleep > 0) Thread.Sleep(this.Sleep);
}
catch { }
}
this.LogMessages();
this.IsRunning = false;
}
private void CreateClaimHeader2(IDataReader reader, List<object> lst)
{
lst.Add(reader["ClaimHeaderID"]);
lst.Add(reader["ClientCode"]);
lst.Add(reader["MemberID"]);
lst.Add(reader["ProviderID"]);
lst.Add(reader["ClaimNumber"]);
lst.Add(reader["PatientAcctNumber"]);
lst.Add(reader["Source"]);
lst.Add(reader["SourceID"]);
lst.Add(reader["TotalPayAmount"]);
lst.Add(reader["TotalBillAmount"]);
lst.Add(reader["FirstDateOfService"]);
lst.Add(reader["LastDateOfService"]);
lst.Add(reader["MaxStartDateOfService"]);
lst.Add(reader["MaxValidStartDateOfService"]);
lst.Add(reader["LastUpdated"]);
lst.Add(reader["UpdatedBy"]);
}
/// <summary>
/// Toes the data table.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="data">The data.</param>
/// <returns></returns>
public DataTable ToDataTable<T>(IEnumerable<T> data)
{
PropertyDescriptorCollection props =
TypeDescriptor.GetProperties(typeof(T));
if (props == null) throw new ArgumentNullException("Table properties.");
if (data == null) throw new ArgumentNullException("data");
DataTable table = new DataTable();
for (int i = 0; i < props.Count; i++)
{
PropertyDescriptor prop = props[i];
table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
}
object[] values = new object[props.Count];
foreach (T item in data)
{
for (int i = 0; i < values.Length; i++)
{
values[i] = props[i].GetValue(item) ?? DBNull.Value;
}
table.Rows.Add(values);
}
return table;
}
private void LogMessages()
{
using (SqlConnection cn = new SqlConnection(this.logCnString))
{
try
{
cn.Open();
DataTable dt = ToDataTable(this.Logs);
Console.WriteLine(string.Format("Logging {0} records for Thread: {1}", this.Logs.Count, this.Name));
using (SqlCommand cmd = new SqlCommand("Analysis.spInsertWorkItemRouteLog", cn))
{
cmd.CommandType = System.Data.CommandType.StoredProcedure;
cmd.Parameters.AddWithValue("@dt", dt);
cmd.ExecuteNonQuery();
}
Console.WriteLine(string.Format("Logged {0} records for Thread: {1}", this.Logs.Count, this.Name));
}
finally
{
cn.Close();
}
}
}
}
}