Используя службу WCF через асинхронный интерфейс из рабочего потока, как я могу гарантировать, что события отправляются от клиента «по порядку» - PullRequest
1 голос
/ 09 ноября 2010

Я пишу библиотеку классов Silverlight для абстрагирования интерфейса от службы WCF. Служба WCF предоставляет централизованную службу ведения журнала. Библиотека классов Silverlight предоставляет упрощенный интерфейс, похожий на log4net (logger.Info, logger.Warn и т. Д.) Для ведения журнала. Из библиотеки классов я планирую предоставить такие параметры, чтобы зарегистрированные сообщения могли накапливаться на клиенте и отправляться «пакетами» в службу регистрации WCF, а не отправлять каждое сообщение по мере его появления. Как правило, это работает хорошо. Библиотека классов накапливает сообщения и отправляет коллекции сообщений в службу ведения журнала WCF, где они регистрируются базовой структурой ведения журнала.

Моя текущая проблема заключается в том, что сообщения (от одного клиента с одним потоком - весь код ведения журнала находится в событиях нажатия кнопки) чередуются в службе ведения журнала. Я понимаю, что, по крайней мере, отчасти это происходит из-за создания экземпляров (PerCall) или синхронизации службы журналов WCF. Однако также кажется, что мои сообщения происходят с такой быстрой последовательностью, что «пакеты» сообщений, оставляемых при асинхронных вызовах, фактически «оставляют» клиента в другом порядке, чем они были сгенерированы.

Я попытался настроить очередь потребителя производителя, как описано здесь с небольшим (или должно быть "небольшим" с воздушными кавычками) изменением, которое блокирует метод Work (WaitOne) до асинхронного вызова возвращает (т.е. пока не выполнится асинхронный обратный вызов). Идея состоит в том, что когда один пакет сообщений отправляется в службу ведения журналов WCF, очередь должна ждать, пока этот пакет не будет обработан, перед отправкой следующего пакета.

Возможно, то, что я пытаюсь сделать, неосуществимо, или, может быть, я пытаюсь решить не ту проблему (или, может быть, я просто не знаю, что я делаю!).

В любом случае, вот мой код очереди производителя / потребителя:

  internal class ProducerConsumerQueue : IDisposable
  {
    EventWaitHandle wh = new AutoResetEvent(false);
    Thread worker;
    readonly object locker = new object();
    Queue<ObservableCollection<LoggingService.LogEvent>> logEventQueue = new Queue<ObservableCollection<LoggingService.LogEvent>>();

    LoggingService.ILoggingService loggingService;

    internal ProducerConsumerQueue(LoggingService.ILoggingService loggingService)
    {
      this.loggingService = loggingService;
      worker = new Thread(Work);
      worker.Start();
    }

    internal void EnqueueLogEvents(ObservableCollection<LoggingService.LogEvent> logEvents)
    {
      //Queue the next burst of messages
      lock(locker)
      {
        logEventQueue.Enqueue(logEvents);
        //Is this Set conflicting with the WaitOne on the async call in Work?
        wh.Set();        
      }
    }

    private void Work()
    {
      while(true)
      {
        ObservableCollection<LoggingService.LogEvent> events = null;

        lock(locker)
        {
          if (logEventQueue.Count > 0)
          {
            events = logEventQueue.Dequeue();
            if (events == null || events.Count == 0) return;            
          }
        }

        if (events != null && events.Count > 0)
        {
          System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count);

          //
          // This seems to be the key...
          // Send one burst of messages via an async call and wait until the async call completes.
          //
          loggingService.BeginLogEvents(events, ar =>
          {
            try
            {
              loggingService.EndLogEvents(ar);
              System.Diagnostics.Debug.WriteLine("3. Work - Back");
              wh.Set();
            }
            catch (Exception ex)
            {
            }
          }, null);

          System.Diagnostics.Debug.WriteLine("2. Work - Waiting");

          wh.WaitOne();

          System.Diagnostics.Debug.WriteLine("4. Work - Finished");
        }
        else
        {
          wh.WaitOne();
        }
      }
    }

    #region IDisposable Members

    public void Dispose()
    {
      EnqueueLogEvents(null);
      worker.Join();
      wh.Close();
    }

    #endregion
  }

В моем тесте это по сути называется так:

//Inside of LogManager, get the LoggingService and set up the queue.
ILoggingService loggingService = GetTheLoggingService();
ProducerConsumerQueue loggingQueue = new ProducerConsumerQueue(loggingService);

//Inside of client code, get a logger and log with it
ILog logger = LogManager.GetLogger("test");

for (int i = 0; i < 100; i++)
{
  logger.InfoFormat("logging message [{0}]", i);
}

Внутренне logger / LogManager накапливает некоторое количество сообщений регистрации (скажем, 25) перед добавлением этой группы сообщений в очередь. Примерно так:

internal void AddNewMessage(string message)
{
  lock(logMessages)
  {
    logMessages.Add(message);
    if (logMessages.Count >= 25)
    {
      ObservableCollection<LogMessage> messages = new ObservableCollection<LogMessage>(logMessages);
      logMessages.Clear();
      loggingQueue.EnqueueLogEvents(messages);
    }
  }
}

Итак, в этом случае я бы ожидал получить 4 пакета по 25 сообщений в каждом. Основываясь на инструкциях Debug в моем коде ProducerConsumerQueue (может быть, это не лучший способ отладки этого?), Я ожидаю увидеть что-то вроде этого:

  1. Работа - Отправка 25 событий
  2. Работа - Ожидание
  3. Работа - Назад
  4. Работа - Готово

Повторяется 4 раза.

Вместо этого я вижу что-то вроде этого:

* 1. Работа - Отправка 25 событий

* 2. Работа - Ожидание

* 4. Работа - Готово

* 1. Работа - Отправка 25 событий

* 2. Работа - Ожидание

* 3. Работа - Назад

* 4. Работа - Готово

* 1. Работа - Отправка 25 событий

* 2. Работа - Ожидание

* 3. Работа - Назад

* 4. Работа - Готово

* 1. Работа - Отправка 25 событий

* 2. Работа - Ожидание

* 3. Работа - Назад

* 3. Работа - Назад

* 4. Работа - Готово

(добавлено ведение *, чтобы строки не были пронумерованы SO)

Полагаю, я ожидал бы, что очередь позволила бы добавить несколько пакетов сообщений, но она бы полностью обработала один пакет (ожидание завершения вызова acync) перед обработкой следующего пакета. Кажется, он этого не делает. Кажется, он не ожидает надежно завершения асинхронного вызова. У меня есть вызов Set в EnqueueLogEvents, может быть, это отменяет WaitOne из Work метода?

Итак, у меня есть несколько вопросов: 1. Имеет ли смысл мое объяснение того, что я пытаюсь выполнить (понятно ли мое объяснение, не является ли это хорошей идеей или нет)?

  1. Является ли хорошей идеей то, что я пытаюсь (передать - от клиента - сообщения из одного потока, в порядке их появления, полностью обрабатывая один набор сообщений за раз)?

  2. Я рядом?

  3. Можно ли это сделать?

  4. Должно ли это быть сделано?

Спасибо за любую помощь!

[EDIT] После дополнительного расследования и благодаря предложению Брайана мы смогли заставить это работать. Я скопировал измененный код. Ключевым моментом является то, что мы теперь используем дескриптор ожидания wh для функций ProducerConsumerQueue. Вместо того, чтобы использовать wh для ожидания завершения асинхронного вызова, теперь мы ждем res.AsyncWaitHandle, который возвращается вызовом BeginLogEvents.

  internal class LoggingQueue : IDisposable
  {
    EventWaitHandle wh = new AutoResetEvent(false);
    Thread worker;
    readonly object locker = new object();
    bool working = false;

    Queue<ObservableCollection<LoggingService.LogEvent>> logEventQueue = new Queue<ObservableCollection<LoggingService.LogEvent>>();

    LoggingService.ILoggingService loggingService;

    internal LoggingQueue(LoggingService.ILoggingService loggingService)
    {
      this.loggingService = loggingService;
      worker = new Thread(Work);
      worker.Start();
    }

    internal void EnqueueLogEvents(ObservableCollection<LoggingService.LogEvent> logEvents)
    {
      lock (locker)
      {
        logEventQueue.Enqueue(logEvents);

        //System.Diagnostics.Debug.WriteLine("EnqueueLogEvents calling Set");

        wh.Set();
      }
    }

    private void Work()
    {
      while (true)
      {
        ObservableCollection<LoggingService.LogEvent> events = null;

        lock (locker)
        {
          if (logEventQueue.Count > 0)
          {
            events = logEventQueue.Dequeue();
            if (events == null || events.Count == 0) return;
          }
        }

        if (events != null && events.Count > 0)
        {
          //System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count);

          IAsyncResult res = loggingService.BeginLogEvents(events, ar =>
          {
            try
            {
              loggingService.EndLogEvents(ar);
              //System.Diagnostics.Debug.WriteLine("3. Work - Back");
            }
            catch (Exception ex)
            {
            }
          }, null);

          //System.Diagnostics.Debug.WriteLine("2. Work - Waiting");

          // Block until async call returns.  We are doing this so that we can be sure that all logging messages
          // are sent FROM the client in the order they were generated.  ALSO, we don't want interleave blocks of logging
          // messages from the same client by sending a new block of messages before the previous block has been
          // completely processed.

          res.AsyncWaitHandle.WaitOne();

          //System.Diagnostics.Debug.WriteLine("4. Work - Finished");
        }
        else
        {
          wh.WaitOne();
        }
      }
    }

    #region IDisposable Members

    public void Dispose()
    {
      EnqueueLogEvents(null);
      worker.Join();
      wh.Close();
    }

    #endregion
  }

Как я уже упоминал в своем первоначальном вопросе и в комментариях к Джону и Брайану, я до сих пор не знаю, является ли выполнение всей этой работы хорошей идеей, но по крайней мере код выполняет то, что я хотел. Это означает, что у меня по крайней мере есть выбор сделать это таким или другим способом (например, восстановить порядок после факта), а не иметь выбор.

Ответы [ 2 ]

3 голосов
/ 09 ноября 2010

Могу ли я предположить, что существует простая альтернатива всей этой координации?Создайте последовательность с использованием дешевого монотонно увеличивающегося идентификатора (например, с Interlocked.Increment()), так что независимо от того, в каком порядке происходят события на клиенте или сервере, вы можете восстановить оригинал , заказывая позже.* Это должно позволить вам быть эффективным и гибким, отправляя все, что вы хотите, асинхронно, без ожидания подтверждения, но без потери порядка.

Очевидно, что это означает, что идентификатор (или, возможно, поле с гарантированной уникальной меткой времени) потребуетсябыть частью вашей службы WCF, но если вы контролируете оба конца, это должно быть достаточно просто.

1 голос
/ 09 ноября 2010

Причина, по которой вы получаете такой вид последовательности, заключается в том, что вы пытаетесь использовать тот же дескриптор ожидания, который использует очередь производитель-потребитель для другой цели. Это вызовет всевозможный хаос. В какой-то момент дела пойдут все хуже и хуже, и очередь в конечном итоге будет заблокирована. Вы действительно должны создать отдельный WaitHandle, чтобы дождаться завершения службы регистрации. Или, если BeginLoggingEvents соответствует стандартному шаблону, он вернет IAsyncResult, который содержит WaitHandle, который вы можете использовать вместо создания своего собственного.

В качестве дополнительного примечания, мне действительно не нравится шаблон «производитель-потребитель», представленный на веб-сайте Albarahi. Проблема в том, что это небезопасно для нескольких потребителей (очевидно, это вас не касается). И я говорю это при всем моем уважении, потому что считаю его сайт одним из лучших ресурсов для многопоточного программирования. Если вам доступна BlockingCollection , используйте вместо этого

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...