Предотвращение условий гонки для обработки сообщений - PullRequest
2 голосов
/ 04 октября 2011

У меня есть приложение J2EE, которое получает сообщения (события) через веб-сервис. Сообщения имеют различные типы (требующие различной обработки в зависимости от типа) и отправляются в определенной последовательности. Выявлена ​​проблема, когда некоторые типы сообщений обрабатываются дольше, чем другие. В результате сообщение, полученное вторым в последовательности, может быть обработано раньше, чем первое в последовательности. Я попытался решить эту проблему, поместив синхронизированный блок вокруг метода, который обрабатывает сообщения. Кажется, это работает, но я не уверен, что это «правильный» подход? Возможно, есть альтернатива, которая может быть более подходящей или это "приемлемо"? Я включил небольшой фрагмент кода, чтобы попытаться объяснить более четко. .... Любой совет / руководство приветствуется.

public class EventServiceImpl implements EventService {
  public String submit (String msg) {

    if (msg == null)
        return ("NAK");

            EventQueue.getInstance().submit(msg);

    return "ACK";
  }
}


public class EventQueue {
    private static EventQueue instance = null;
    private static int QUEUE_LENGTH = 10000;
    protected boolean done = false;
    BlockingQueue<String> myQueue = new LinkedBlockingQueue<String>(QUEUE_LENGTH);

protected EventQueue() {
    new Thread(new Consumer(myQueue)).start();
}

public static EventQueue getInstance() {
      if(instance == null) {
         instance = new EventQueue();
      }
      return instance;
}

public void submit(String event) {
    try {
        myQueue.put(event);
    } catch (InterruptedException ex) {
    }
}

class Consumer implements Runnable {
    protected BlockingQueue<String> queue;

    Consumer(BlockingQueue<String> theQueue) { this.queue = theQueue; }

    public void run() {
      try {
        while (true) {
          Object obj = queue.take();
          process(obj);
          if (done) {
            return;
          }
        }
      } catch (InterruptedException ex) {
      }
    }

    void process(Object obj) {
        Event event = new Event( (String) obj);
        EventHandler handler = EventHandlerFactory.getInstance(event);
        handler.execute();
    }
}

// Close queue gracefully
public void close() {
    this.done = true;
}

Ответы [ 2 ]

4 голосов
/ 04 октября 2011

Я не уверен, с какой структурой (EJB (MDB) / JMS) вы работаете. Обычно следует избегать использования синхронизации внутри Managed Environment , подобной EJB / JMS (это не очень хорошая практика). Один из способов обойти это

  • клиент должен дождаться подтверждения от сервера, прежде чем он отправит следующее сообщение.
  • таким образом ваш клиент сам будет контролировать последовательность событий.

Обратите внимание, что это не будет работать, если сообщения отправляются несколькими клиентами.

РЕДАКТИРОВАТЬ:

В вашей ситуации клиент веб-службы отправляет сообщение последовательно, без учета времени обработки сообщения. Это просто сбрасывает сообщение один за другим. Это хороший пример решения Queue (First In First Out) . Я предлагаю следующие два способа сделать это

  1. Использовать JMS . Это потребует дополнительных затрат на добавление JMS-провайдеров и написание некоторого кода.
  2. Используйте шаблон с несколькими заголовками, например Producer-Consumer , в котором ваш обработчик веб-службы будет выгружать входящее сообщение в очередь, а однопоточный потребитель будет использовать по одному сообщению за раз. См. этот пример с использованием пакета java.util.concurrent.
  3. Использовать базу данных. Дамп входящих сообщений в базу данных. Используйте другую программу на основе планировщика для сканирования базы данных (на основе порядкового номера) и соответствующей обработки сообщений.

    Первое и третье решение очень стандартно для подобных проблем. Второй подход будет быстрым и не потребует дополнительных библиотек в вашем коде.

2 голосов
/ 04 октября 2011

Если события должны обрабатываться в определенной последовательности, то почему бы не попробовать добавить в сообщения поля «eventID» и «orderID»?Таким образом, ваш класс EventServiceImpl может сортировать, упорядочивать и затем выполнять в правильном порядке (независимо от порядка их создания и / или доставки обработчику).

Синхронизация блока handler.execute() не приведет к желаемомуРезультаты, я ожидаю.Все ключевое слово synchronized не позволяет нескольким потокам выполнять этот блок одновременно.Он ничего не делает в плане правильного упорядочения следующего потока.

Если блок synchronized действительно работает, то я утверждаю, что вам очень повезло в том, что сообщения создаются, доставляютсяи затем действовал в правильном порядке.В многопоточной среде это не гарантировано!Я бы предпринял шаги, чтобы убедиться, что вы управляете этим, а не полагаетесь на удачу.

Пример:

  1. Сообщения создаются в порядке 'client01-A', 'client01-C ',' client01-B ',' client01-D '
  2. Сообщения поступают к обработчику в следующем порядке: client01-D, client01-B, client01-A, client01-C '
  3. EventHandler может различать сообщения от одного клиента к другому и начинает кэшировать сообщения "client01".
  4. EventHandler recv' client01-A 'сообщение и знает, что может обработать это и делает
  5. EventHandler ищет в кэше сообщение «client01-B», находит и обрабатывает его.
  6. EventHandler не может найти «client01-C», поскольку оно еще не пришло.
  7. EventHandler запрашивает «client01-C» и обрабатывает его.
  8. EventHandler ищет в кеше «client01-D», находит его, обрабатывает и считает, что взаимодействие «client01» завершено.

Что-то в этом духе обеспечит надлежащую обработку ибудет способствовать хорошему использованию нескольких потоков.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...