У нас есть Service Fabric StatefulService
, которая отлично работает.Он принимает сообщения, обрабатывает их и имеет два IReliableState
для хранения данных из каждого сообщения.Он будет обрабатывать около 500 сообщений в минуту для каждой реплики.
Для каждого сообщения, которое входит в MessageProcessor
, мы создаем новый ITransaction
с использованием IReliableStateManager
, который обернут в блок using, и передаем еготранзакция на OuterMessageHandler
.После обработки сообщения мы делаем ITransaction.CommitAsync
или ITransaction.Abort
в случае сбоя.
OuterMessageHandler
выглядит так:
public async Task Handle(ITransaction tx, params Envelope[] messages)
{
foreach (var msg in messages)
{
using (var scope = _scope.BeginLifetimeScope())
{
var contextProvider = scope.Resolve<MessageContextProvider>();
contextProvider.Set(tx);
await innerHandler.Handle(msg);
}
}
}
MessageContextProvider
- это простониже:
internal class MessageContextProvider
{
private ITransaction _tx;
public void Set(ITransaction tx)
{
_tx = tx;
}
public ITransaction GetTransaction()
{
if (_tx == null)
throw new Exception("ITransaction has not been set");
return _tx;
}
}
Это зарегистрировано с Autofac
:
builder
.Register(c =>
{
var context = c.Resolve<MessageContextProvider>();
return context.GetTransaction();
})
.As<ITransaction>()
.ExternallyOwned();
builder
.RegisterType<MessageContextProvider>()
.AsSelf()
.InstancePerLifetimeScope();
MessageContextProvider
существует просто для того, чтобы мы могли использовать ITransaction
во всех InnerMessageHandler
, как если быэто просто нормальная зависимость, хотя в каждом сообщении все равно используется одна и та же транзакция.ITransaction
помечен как ExternallyOwned
, так что OuterMessageHandler
не удаляет его до того, как мы сделаем коммит в MessageProcessor
.
InnerMessageHandler
s - просто обработчики для выполнения нашей бизнес-логики.
IReliableState
имеет 2 метода SomeType Find(long id)
и Update(long id, SomeType someType)
.
Метод поиска выглядит следующим образом:
var snapshotHandler = await _stateManager.GetOrAddAsync<IReliableDictionary<long, SomeType>>(SomeTypeKey);
var snapshot = await snapshotHandler.GetOrAddAsync(
_transaction,
Id,
new SomeType());
return snapshot;
Обновление выглядит следующим образом:
var snapshotHandler = await _stateManager.GetOrAddAsync<IReliableDictionary<long, SomeType>>(SomeTypeKey);
await snapshotHandler.SetAsync(_transaction, Id, snapshot);
Когда мы бросаем много запросов в службу, все реплики остаются значительно ниже 1% загрузки ЦП.Примерно через час одна из реплик достигает примерно 30% -35%.Когда мы перестаем запускать тесты на сервисе (т. Е. Сервис в настоящее время бездействует), загрузка ЦП по-прежнему остается на уровне 30-35%.Было бы хорошо, если бы у нас были всплески и мы вернулись вниз, но проблема заключается в постоянном высоком использовании ЦП.
Из нашего исследования мы заменили 2 IReliableState на 2 в памяти ConcurrentDictionary.Это решило проблему.Мы могли бы запустить его часами, и загрузка процессора не превысит 2%.Это, очевидно, не является решением, так как нам необходимо, чтобы внутреннее состояние сохранялось по причинам устойчивости.
Мы использовали PerfView и dotTrace, чтобы увидеть, что происходит, и поступило не так много достоверной информации.
На данный момент я считаю, что это связано с тем, как мы используем IReliableDictionary или ITransaction.У кого-нибудь есть похожие проблемы?Может ли кто-нибудь пролить свет на то, что мы, возможно, делаем неправильно?
Редактировать В одном из ReliableState
репозиториев (назовем его state2
) метод Find был немного по-другому.Это выглядело следующим образом:
var snapshotHandler = await _stateManager.GetOrAddAsync<IReliableDictionary<long, SomeType2>>(SomeTypeKey2);
var snapshot = await snapshotHandler.TryGetValueAsync(
_transaction,
Id);
if(snapshot.HasValue)
return snapshot.Value.Clone(); // Clone is a deep copy
var stateFromSomeApi = await someApi.GetStartState(id);
await snapshotHandler.SetValue(_transaction, Id, stateFromSomeApi);
return stateFromSomeApi ;
Мы изменили его, сделав следующее:
public async Task<SomeType2> Find(long id)
{
var snapshotHandler = await _stateManager.GetOrAddAsync<IReliableDictionary<long, SomeType2>>(
SomeTypeKey2);
var snapshot = await snapshotHandler.GetOrAddAsync(
_transaction,
id,
await GetStartState(id));
return snapshot.Clone();
}
private async Task<SomeType2> GetStartState(long id)
{
return await _someApi.GetStartState(id);
}
Мы изменили state2
, чтобы сделать GetOrAddAsync
, и он работает отлично.Почему в TryGetValueAsync
и SetValue
вместо GetOrAddAsync
потоки так часто зависают?Мы протестировали его почти с удвоенной производственной нагрузкой, и загрузка ЦП остается значительно ниже 5% для каждой первичной реплики.