У меня довольно высокая пропускная способность для счетчика сообщений (десятки тысяч в секунду), и я ищу эффективный способ получения счетчика без повсеместной блокировки или, в идеале, без блокировки каждого счетчика сообщений, когда я обновляю каждый 10 секунд.
Использование объекта неизменяемого счетчика
Я использую класс неизменяемого счетчика:
public class Counter
{
public Counter(int quotes, int trades)
{
Quotes = quotes;
Trades = trades;
}
readonly public int Quotes;
readonly public int Trades;
// and some other counter fields snipped
}
И будет обновлять это при каждом сообщении process l oop:
class MyProcessor
{
System.Timers.Timer timer;
Counter counter = new Counter(0,0);
public MyProcessor()
{
// update ever 10 seconds
this.timer = new System.Timers.Timer(10000);
timer.Elapsed += (sender, e) => {
var quotesPerSecond = this.counter.Quotes / 10.0;
var tradesPerSecond = this.counter.Trades / 10.0;
this.Counter = new Counter(0,0);
});
}
public void ProcessMessages(Messages messages)
{
foreach(var message in messages) { /* */ }
var oldCounter = counter;
this.counter = new Counter(oldCounter.Quotes, oldCounter.Trades);
}
}
У меня много счетчиков (не все показаны), поэтому будет означать много отдельных вызовов Interlocked.Increment
для отдельных полей счетчиков.
Единственное Другой способ, о котором я могу думать, это блокировка каждого запуска ProcessMessages
(который будет обширным) и тяжелый для чего-то, что является утилитой, а не критическим, когда программа взломает sh.
Можно ли использовать неизменяемый объект-счетчик таким образом без жестких механизмов блокировки / потоков, когда нам нужно обновлять только раз в 10 секунд?
Идея проверки флага чтобы избежать блокировок
Может ли поток таймера установить флаг для ProcessMessages
, чтобы проверить, и если он видит, что он установлен, снова начать отсчет с нуля, то есть
/* snipped the MyProcessor class, same as before */
System.Timers.Timer timer;
Counter counter = new Counter(0,0);
ManualResetEvent reset = new ManualResetEvent(false);
public MyProcessor()
{
// update ever 10 seconds
this.timer = new System.Timers.Timer(10000);
timer.Elapsed += (sender, e) => {
var quotesPerSecond = this.counter.Quotes / 10.0;
var tradesPerSecond = this.counter.Trades / 10.0;
// log
this.reset.Set();
});
}
// this should be called every second with a heartbeat message posted to queue
public void ProcessMessages(Messages messages)
{
if (reset.WaitOne(0) == true)
{
this.counter = new Counter(this.counter.Quotes, this.counter.Trades, this.counter.Aggregates);
reset.Reset();
}
else
{
this.counter = new Counter(
this.counter.Quotes + message.Quotes.Count,
this.counter.Trades + message.Trades.Count);
}
}
/* end of MyProcessor class */
Это сработало бы, однако обновление «останавливается», когда сообщения процесса останавливаются (хотя пропускная способность очень высока, она делает паузу на несколько часов ночью, в идеале должно отображаться фактическое, а не последнее значение).
Одним из способов решения этой проблемы является отправка сообщения пульса на MyProcessor.ProcessMessages()
каждую секунду для принудительного внутреннего обновления счетчиков сообщений и последующего сброса, когда установлен reset
ManualResetEvent.