Как я могу реализовать протокол запроса-ответа без блокировки в ожидании ответа? - PullRequest
3 голосов
/ 22 октября 2011

Мне нужно реализовать приложение, которое одновременно обменивается данными с несколькими клиентами, используя (двунаправленный) протокол запроса-ответа. Ранее я реализовал это, используя два выделенных потока для каждого клиента (один читатель / реактор и один писатель / инициатор). Проблема в том, что управление потоками стало довольно сложным и безобразным. Существует ли какой-либо стандартный способ обработки этого, возможно, даже с одним потоком или, по крайней мере, с постоянным количеством потоков для обработки всех клиентов?

Это то, как некоторые коммуникации будут выглядеть в потоке с реализацией блокировки:

Command response = request("cmd1", "a", "b");
if(!response.is("OK")) {
    return;
}
response = request("cmd2", "c");
if(!response.is("OK")) {
    return;
}
response = request("cmd3");

Между отправкой запроса и ожиданием соответствующего ответа я хотел бы, чтобы текущий поток мог выполнять другую работу, но затем продолжить выполнение, как только ответ действительно придет.

Я знаю, что можно было бы использовать асинхронный ввод-вывод и зарегистрировать экземпляры Java Future / Runnable для запуска после получения ответа на запрос, но это легко превращается в многоуровневое вложение анонимных подклассов Runnable, и я подозреваю, что это будет больше боли, чем стоит. Это может привести к чему-то похожему на приведенный ниже пример, который быстро становится сильно вложенным и нечитаемым. Неужели должен быть более простой способ?

request("cmd1", "a", "b", new ResponseHandler() {
    public void response(Command response) {
        if(response.is("OK")) {
            request("cmd2", "c", new ResponseHandler() {
                public void response(Command response) {
                    if(response.is("OK")) {
                        request("cmd3", new NullResponseHandler());
                    }
                }});
        }
    }});

Я также рассмотрел возможность использования выделенной Actor Framework для обработки логики запрос-ответ. Хотя они выглядят так, как будто они могут помочь в этой ситуации, я никогда раньше не работал с такими платформами, поэтому я не знаю, подходят ли они для этой ситуации.

Короче говоря, мой вопрос: как я могу обрабатывать произвольное количество соединений запрос-ответ неблокирующим образом, чтобы было достаточно постоянного количества потоков? Являются ли рамки Actor правильным подходом?

PS. Я использую Java для этого проекта.

Ответы [ 2 ]

2 голосов
/ 22 октября 2011

Я думаю, что это должно быть довольно просто с использованием некоторой инфраструктуры Java NIO, например, netty .Я не использовал фреймворк Actor, поэтому не знаю, будет ли это лучше подходить.

По сути, вы создаете один класс, который обрабатывает всю информацию и хранит необходимую информацию - фреймворк обрабатывает все потоки вфон вы просто предоставляете метод для например messageReceived и обрабатываете его там.

Недостатком является то, что вы должны в основном написать свой собственный конечный автомат, который может быть не таким простым - но это, безусловно, самый простой способ использованияNIO.

Пример:

enum State {
    S0, S1
}
private State state = State.S0;
public void messageReceived(
        ChannelHandlerContext ctx, MessageEvent e) {
    switch(state) {
    case S0:
        // read object from channel and write appropriate response
        e.getChannel().write("HELO");  // writes are asynchronous 
        state = State.S1;
        break;
    case S1:
        // same as S0
        e.getChannel().write("DONE");
        break;
    }
}

Обратите внимание, что чтение учебника по netty по-прежнему абсолютно необходимо, потому что в NIO есть вещи, которые не объясняют сами себя (по крайней мере, они не былимне).Но есть несколько примеров, с которыми легко работать, и которые должны научить вас основам.

1 голос
/ 22 октября 2011

Я думаю, что использование неблокирующего ввода-вывода Javas - это путь.Оттуда вы можете просто добавить ответ в BlockingQueue.Тогда может быть один поток, получающий элементы из BlockingQueue (или блокирующий, если элементов там нет) и обрабатывающий ответы последовательно.

...