IgniteQueue в Apache Ignite.NET - PullRequest
       57

IgniteQueue в Apache Ignite.NET

1 голос
/ 25 сентября 2019

Мы используем Ignite.NET и не имеем возможности использовать Ignite Java API (навыки работы в команде, сходство с технологиями и т. Д.).Мы стремимся создать механизм очередей, чтобы мы могли обрабатывать сообщения распределенным способом.Я обнаружил, что структура данных IgniteQueue является наиболее подходящей, но она не доступна в ignite.net, может кто-нибудь предложить решение сценария.Несколько производителей ставят в очередь уникальный рабочий элемент для надежной обработки только одним потребителем за раз.

Например, есть производители P1, P2 (на разных машинах), которые генерируют T1, T2, T3 в очереди, и у нас есть потребители C1, C2, C3 (на разных машинах), теперь T1 должен обрабатываться ТОЛЬКО 1из C1, C2, C3 и т. д. для T2, T3 также аналогичным образом должен обрабатываться только один раз одним потребителем

1 Ответ

1 голос
/ 26 сентября 2019

IgniteQueue построен поверх Ignite Cache, так что да, вы можете повторить ту же функциональность в .NET:

  1. Создать кеш
  2. Использовать Непрерывный запрос как потребитель, вызовите ICache.Remove, чтобы гарантировать, что каждый элемент обрабатывается только один раз
  3. Добавить данные в кэш на производителях с Data Streamers или просто использовать ICache.Put / PutAll

Ниже приведен код для прослушивателя непрерывных запросов:

class CacheEventListener<TK, TV> : ICacheEntryEventListener<TK, TV>
{
    private readonly string _cacheName;

    [InstanceResource]  // Injected automatically.
    private readonly IIgnite _ignite = null;

    private ICache<TK, TV> _cache;

    public CacheEventListener(string cacheName)
    {
        _cacheName = cacheName;
    }

    public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events)
    {
        _cache = _cache ?? _ignite.GetCache<TK, TV>(_cacheName);

        foreach (var entryEvent in events)
        {
            if (entryEvent.EventType == CacheEntryEventType.Created && _cache.Remove(entryEvent.Key))
            {
                // Run consumer logic here - use another thread for heavy processing.
                Consume(entryEvent.Value);
            }
        }
    }
}

Затем мы развернем его на каждом узле с помощью одного вызова:

var consumer = new CacheEventListener<Guid, string>(cache.Name);
var continuousQuery = new ContinuousQuery<Guid, string>(consumer);
cache.QueryContinuous(continuousQuery);

Какрезультат, OnEvent вызывается один раз для каждой записи на основном узле для этой записи.Таким образом, есть один потребитель на узел Ignite.Мы можем увеличить эффективное число потребителей на узел, перенося фактическую потребительскую логику в другие потоки, используя BlockingCollection и т. Д.

И еще одна вещь - мы должны придумать уникальный ключ кэша для каждого новогозапись.Самое простое - это Guid.NewGuid(), но мы также можем использовать AtomicSequence.

...