IgniteQueue построен поверх Ignite Cache, так что да, вы можете повторить ту же функциональность в .NET:
- Создать кеш
- Использовать Непрерывный запрос как потребитель, вызовите ICache.Remove, чтобы гарантировать, что каждый элемент обрабатывается только один раз
- Добавить данные в кэш на производителях с Data Streamers или просто использовать
ICache.Put
/ PutAll
Ниже приведен код для прослушивателя непрерывных запросов:
class CacheEventListener<TK, TV> : ICacheEntryEventListener<TK, TV>
{
private readonly string _cacheName;
[InstanceResource] // Injected automatically.
private readonly IIgnite _ignite = null;
private ICache<TK, TV> _cache;
public CacheEventListener(string cacheName)
{
_cacheName = cacheName;
}
public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events)
{
_cache = _cache ?? _ignite.GetCache<TK, TV>(_cacheName);
foreach (var entryEvent in events)
{
if (entryEvent.EventType == CacheEntryEventType.Created && _cache.Remove(entryEvent.Key))
{
// Run consumer logic here - use another thread for heavy processing.
Consume(entryEvent.Value);
}
}
}
}
Затем мы развернем его на каждом узле с помощью одного вызова:
var consumer = new CacheEventListener<Guid, string>(cache.Name);
var continuousQuery = new ContinuousQuery<Guid, string>(consumer);
cache.QueryContinuous(continuousQuery);
Какрезультат, OnEvent
вызывается один раз для каждой записи на основном узле для этой записи.Таким образом, есть один потребитель на узел Ignite.Мы можем увеличить эффективное число потребителей на узел, перенося фактическую потребительскую логику в другие потоки, используя BlockingCollection
и т. Д.
И еще одна вещь - мы должны придумать уникальный ключ кэша для каждого новогозапись.Самое простое - это Guid.NewGuid()
, но мы также можем использовать AtomicSequence
.