Как закрыть соединение в CometProcessor на Tomcat7? - PullRequest
0 голосов
/ 08 февраля 2012

Я пишу длинный сервис опросов для доставки событий AJAX. Для этого я использую сервлет Tomcat7 CometProcessor. Сервлет похож на пример сервлета чата Tomcat.

Для каждого входящего запроса создается объект Connection с CometEvent, HttpServletRequest, HttpServletResponse и некоторыми параметрами запроса. Запрос хранится в мультикарте для userId, то есть для каждого userId может быть один или несколько открытых запросов / соединений.

События AJAX отправляются в отдельном потоке, который запускается всякий раз, когда поступают новые данные для пользователя. SenderThread получает список событий, список открытых соединений для этого пользователя и отправляет события каждому соединению.

Теперь возникает вопрос: Когда данные были отправлены в SenderThread Я хочу закрыть соединение. Как мне это сделать? Нужно ли вызывать event.close() в потоке отправителя, нужно ли закрыть OutputStream или установить заголовок Connection: close?

Я пробовал разные вещи, но, похоже, ни одна из них не работает должным образом, и документация на это довольно тонкая.

Приведенный ниже код должен просто иллюстрировать, как выглядит код, поскольку реальный код слишком длинный. Так что не смотрите на проверку ошибок или условия гонки.

public class AjaxServlet extends HttpServlet implements CometProcessor {
    ConcurrentMultiMap connections = new ConcurrentMultiMap();
    SenderThread senderThread = new SenderThread();

    public void event(CometEvent event) {
        HttpServletRequest req = event.getHttpServletRequest();
        HttpServletResponse res = event.getHttpServletResponse();

        // error checking omitted
        String userId = req.getParameter("userId");
        Long lastId = Long.parseLong(req.getParameter("lastId"));
        Long requestId = req.getAttribute("requestId");

        switch(event.getEventType()) {      
            case BEGIN:
                event.setTimeout(30000L);
                req.setAttribute("requestId", requestId.incrementAndGet());
                addConnection(userId, requestId, lastId, event);
                break;

            case END:
                removeConnection(userId, requestId);
                event.close();
                break;

            case ERROR:
                boolean timeout = event.getSubType().equals(TIMEOUT);
                if (timeout)
                    res.setStatus(408);
                event.close();
                break;
        }
    }

    public void addConnection(...) {
        // add new connection to connections
    }

    public void removeConnection(long userId, long requestId) {
        // remove connection from connections
    }

    public void init() {
        Thread t = new Thread(senderThread);
        t.setDaemon(true);
        t.start();
    }


    class Connection {
        CometEvent event;       
        long requestId;
        long lastId;

        Connection(CometEvent event, long requestId, long lastId) {
            // details omitted
        }
    }

    class SenderThread implements Runnable {
        private boolean running = true;
        private BlockingQueue<Long> queue = new LinkedBlockingQueue<Long>();

        public void stop() {
            running = false;
        }

        public void pushToUser(long userId) {
            queue.add(userId);
        }

        public void run() {
            while (running) {
                try {
                    long userId = queue.take(); // block until 
                    List<Event> events = eventService.forUser(userId);
                    if (!events.isEmpty()) {
                        List<Connection> conn = connections.get(userId);
                        for (Connection c:connections.get(userId)) {
                            sendEvents(events);
                            // HOW DO I CLOSE THE CONNECTION HERE?
                            // removeConnection(userId, c.requestId);
                            // c.event.close();
                        }                   
                    }

                } catch(Exception e) {
                    // error checking omitted
                }
            }
        }

        private void sendEvents(List<Event> events) {
            String json = ... 
            byte[] data = json.getBytes("UTF-8");
            response.setStatus(200);
            response.setContentType("application/json");
            response.setCharacterEncoding("UTF-8");
            response.setContentLength(data.length);
            OutputStream os = response.getOutputStream();
            os.write(b);
            os.flush();
            os.close(); // WILL THIS CLOSE THE CONNECTION?
        }
    }
}
...