Веб-сервис Java JAX-RS: добавление узлов в XML-результат JAXB по завершении потоков - PullRequest
1 голос
/ 24 июля 2011

Я запрограммировал веб-сервис JAX-RS на Джерси, который запрашивает цены с разных веб-сайтов и возвращает результат в виде XML через аннотированные классы JAXB. К сожалению, некоторым веб-сайтам для ответа требуется до 15 секунд, поэтому я использую несколько потоков, чтобы узнать эти цены.

Я бы хотел написать клиент для этого веб-сервиса сейчас, и мои веб-пользователи не захотят ждать 30 секунд после того, как они нажмут «поиск» для получения результата, поэтому моя идея заключается в динамическом обновлении таблицы результатов как результатов мой веб-сервис JAX-RS возвращается.

Через 30 секунд мой веб-сервис должен отключиться и закрыть элемент <result> или после завершения всех потоков.

Прямо сейчас мой веб-сервис запускает все потоки и возвращает результат после завершения всех потоков. Я хотел бы динамически добавлять результаты в вывод XML по мере их поступления, как я могу это сделать?

Структура XML-ответа:

<result>
  <articles>
    <article>
    content of article
    </article>
  </articles>
  As the webservice gets results from websites it adds new articles to the XML
</result>

RequestController.java

@Path("/request")
public class RequestController {

    @GET
    @Produces("application/xml")
    public Response getRequest(@QueryParam("part") String part) {
        response = new Response();
        driverController = new DriverController(this.response, this.part);
        this.response = driverController.query();
        return this.response;
    }
}

DriverController.java

public class DriverController {


    public Response query() {
        CompletionService<Deque<Article>> completionService = new ExecutorCompletionService<Deque<Article>>(
                Worker.getThreadPool());
        final Deque<Article> articleQueue = new LinkedList<Article>();

        int submittedTasks = 0;

        // This threadwill take about 4 seconds to finish
        Driver driverA = new DriverA(this.part,
                this.currency, this.language);

        // This thread will take about 15 seconds to finish
        Driver driverN = new DriverN(this.part,
                this.currency, this.language);

        completionService.submit(driverA);
        submittedTasks++;
        completionService.submit(driverN);
        submittedTasks++;

        for (int i = 0; i < submittedTasks; i++) {
            log.info("Tasks: " + submittedTasks);
            try {
                Future<Deque<Article>> completedFuture = completionService.take();
                try {
                    Deque<Article> articleQueueFromThread = completedFuture.get();
                    if (articleQueueFromThread != null) {
                        articleQueue.addAll(articleQueueFromThread);
                        response.setStatus("OK");
                    }
                } catch (ExecutionException e) {
                    log.error(e.getMessage());
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                log.error(e.getMessage());
                e.printStackTrace();
            }
        }
        for (Article article : articleQueue) {
            this.response.addArticle(article);
        }
        return this.response;
    }
}

Response.java

@XmlRootElement
public class Response {

    Queue<Article> queue = new ConcurrentLinkedQueue<Article>();
    private String status;
    private String code;
    private String message;
    private List<Article> articles = new ArrayList<Article>();

    public Response(){

    }

    public void setMessage(String message) {
        this.message = message;
    }
    @XmlAttribute
    public String getMessage() {
        return message;
    }
    public void setStatus(String status) {
        this.status = status;
    }
    @XmlAttribute
    public String getStatus() {
        return status;
    }
    public void setCode(String code) {
        this.code = code;
    }
    @XmlAttribute
    public String getCode() {
        return code;
    }

    public void addArticle(Article article) {
        this.articles.add(article);
        System.out.println("Response: ADDED ARTICLE TO RESPONSE");
    }
    @XmlElement(name = "article")
    @XmlElementWrapper(name = "articles")
    public List<Article> getArticles() {
        return articles;
    }

}

Ответы [ 2 ]

2 голосов
/ 31 июля 2011

Я начал адаптировать ваш код, чтобы сделать это, но я решил, что было легче разработать независимый пример.В примере запускается сервер Grizzly + Jersey с одним классом ресурсов.GET на ресурсе порождает три потока, которые задерживаются на 2, 4 и 6 секунд перед возвратом некоторых объектов.После запуска сервера другой поток делает запрос к серверу.Когда вы запускаете его, вы можете ясно видеть, что запросчик получает куски XML, когда соответствующие потоки завершают свою работу на сервере.Единственное, чего он не делает, - это упаковывает отдельно доставленные куски XML в один корневой элемент, поскольку это должно быть относительно тривиально.

Весь исполняемый источник приведен ниже, и если у вас есть maven и git, вы можетеклонируйте его из github и запустите:

git clone git://github.com/zzantozz/testbed.git tmp
cd tmp
mvn compile exec:java -Dexec.mainClass=rds.jersey.JaxRsResource -pl jersey-with-streaming-xml-response

Источник:

import com.sun.grizzly.http.SelectorThread;
import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.StreamingOutput;
import javax.xml.bind.*;
import javax.xml.bind.annotation.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;

@Path("/streaming")
public class JaxRsResource {
    private static ExecutorService executorService = Executors.newFixedThreadPool(4);
    private static int fooCounter;
    private Marshaller marshaller;

    public JaxRsResource() throws JAXBException {
        marshaller = JAXBContext.newInstance(Foo.class).createMarshaller();
        marshaller.setProperty("jaxb.fragment", Boolean.TRUE);
    }

    @GET
    @Produces("application/xml")
    public StreamingOutput streamStuff() {
        System.out.println("Got request for streaming resource; starting delayed response threads");
        final List<Future<List<Foo>>> futureFoos = new ArrayList<Future<List<Foo>>>();
        futureFoos.add(executorService.submit(new DelayedFoos(2)));
        futureFoos.add(executorService.submit(new DelayedFoos(4)));
        futureFoos.add(executorService.submit(new DelayedFoos(6)));
        return new StreamingOutput() {
            public void write(OutputStream output) throws IOException {
                for (Future<List<Foo>> futureFoo : futureFoos) {
                    writePartialOutput(futureFoo, output);
                    output.write("\n".getBytes());
                    output.flush();
                }
            }
        };
    }

    private void writePartialOutput(Future<List<Foo>> futureFoo, OutputStream output) {
        try {
            List<Foo> foos = futureFoo.get();
            System.out.println("Server sending a chunk of XML");
            for (Foo foo : foos) {
                marshaller.marshal(foo, output);
            }
        } catch (JAXBException e) {
            throw new IllegalStateException("JAXB couldn't marshal. Handle it.", e);
        } catch (InterruptedException e) {
            throw new IllegalStateException("Task was interrupted. Handle it.", e);
        } catch (ExecutionException e) {
            throw new IllegalStateException("Task failed to execute. Handle it.", e);
        }
    }

    class DelayedFoos implements Callable<List<Foo>> {
        private int delaySeconds;

        public DelayedFoos(int delaySeconds) {
            this.delaySeconds = delaySeconds;
        }

        public List<Foo> call() throws Exception {
            Thread.sleep(delaySeconds * 1000);
            return Arrays.asList(new Foo(fooCounter++), new Foo(fooCounter++), new Foo(fooCounter++));
        }
    }

    public static void main(String[] args) throws IOException {
        System.out.println("Starting Grizzly with the JAX-RS resource");
        final String baseUri = "http://localhost:9998/";
        final Map<String, String> initParams = new HashMap<String, String>();
        initParams.put("com.sun.jersey.config.property.packages", "rds.jersey");
        SelectorThread threadSelector = GrizzlyWebContainerFactory.create(baseUri, initParams);
        System.out.println("Grizzly started");
        System.out.println("Starting a thread to request the streamed XML");
        executorService.submit(new HttpRequester(baseUri + "streaming"));
    }
}

@XmlRootElement
class Foo {
    @XmlElement
    private int id;

    Foo() {}

    public Foo(int id) {
        this.id = id;
    }
}

class HttpRequester implements Runnable {
    private String url;

    public HttpRequester(String url) {
        this.url = url;
    }

    public void run() {
        try {
            System.out.println("Doing HTTP GET on " + url);
            HttpURLConnection urlConnection = (HttpURLConnection) new URL(url).openConnection();
            BufferedReader in = new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                System.out.println("Client got: " + line);
            }
            System.exit(0);
        } catch (IOException e) {
            throw new IllegalStateException("Some bad I/O happened. Handle it.", e);
        }
    }
}

Важные моменты / различия, на которые следует обратить внимание:

  1. Возвращение ответаиз вашего ресурса метод указывает, что весь ответ содержится в этом объекте и не допускает инкрементных обновлений ответа.Вместо этого верните StreamingOutput.Это говорит Джерси, что вы будете отправлять обратно поток данных, который вы можете добавлять по своему желанию, пока не закончите.StreamingOutput дает вам доступ к OutputStream, который используется для отправки инкрементных обновлений и является ключом ко всему этому.Конечно, это означает, что вы должны справиться с маршалингом самостоятельно.Джерси может выполнять маршалинг только в том случае, если вы возвращаете весь ответ за один раз.
  2. Поскольку OutputStream - это то, как вы отправляете данные постепенно, вы должны либо выполнять потоки в своем JAX-Ресурс RS или передайте OutputStream вашему DriverController и запишите его туда.
  3. Обязательно вызовите flush () для OutputStream, если вы хотите заставить его немедленно отправлять данные.В противном случае клиенту ничего не будет отправлено, пока не будет заполнен какой-либо внутренний буфер.Обратите внимание, что вызов flush () самостоятельно обходит цель буфера и делает ваше приложение более разговорчивым.

В общем, чтобы применить это к вашему проекту, главное, что нужно сделать, это изменить метод ресурсовчтобы вернуть реализацию StreamingOutput и вызвать ваш DriverController изнутри этой реализации, передав OutputStream в DriverController.Затем в DriverController, когда вы возвращаете некоторые статьи из потока, вместо того, чтобы добавлять их в очередь на потом, немедленно запишите их в OutputStream.

0 голосов
/ 05 августа 2011

@ Райан Стюарт: как бы мы решили ту же проблему в axis2.x среде, основанной на SOAP, и в виде HTML-страницы в качестве веб-клиента.Я думаю, что DriverController может сохранять объекты Future в сеансе и возвращает клиенту самый первый доступный ответ (статью) с уникальным идентификатором сеанса ... затем клиент может сделать еще один вызов веб-службы (предпочтительно через Ajax + jquery), передав сохраненный идентификатор сеанса, которыйвызовет DriverController для поиска дополнительных результатов и отправки обратно .... это жизнеспособное решение?Будет ли это применимо и к вышеуказанным условиям.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...