Я начал адаптировать ваш код, чтобы сделать это, но я решил, что было легче разработать независимый пример.В примере запускается сервер Grizzly + Jersey с одним классом ресурсов.GET на ресурсе порождает три потока, которые задерживаются на 2, 4 и 6 секунд перед возвратом некоторых объектов.После запуска сервера другой поток делает запрос к серверу.Когда вы запускаете его, вы можете ясно видеть, что запросчик получает куски XML, когда соответствующие потоки завершают свою работу на сервере.Единственное, чего он не делает, - это упаковывает отдельно доставленные куски XML в один корневой элемент, поскольку это должно быть относительно тривиально.
Весь исполняемый источник приведен ниже, и если у вас есть maven и git, вы можетеклонируйте его из github и запустите:
git clone git://github.com/zzantozz/testbed.git tmp
cd tmp
mvn compile exec:java -Dexec.mainClass=rds.jersey.JaxRsResource -pl jersey-with-streaming-xml-response
Источник:
import com.sun.grizzly.http.SelectorThread;
import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.StreamingOutput;
import javax.xml.bind.*;
import javax.xml.bind.annotation.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
@Path("/streaming")
public class JaxRsResource {
private static ExecutorService executorService = Executors.newFixedThreadPool(4);
private static int fooCounter;
private Marshaller marshaller;
public JaxRsResource() throws JAXBException {
marshaller = JAXBContext.newInstance(Foo.class).createMarshaller();
marshaller.setProperty("jaxb.fragment", Boolean.TRUE);
}
@GET
@Produces("application/xml")
public StreamingOutput streamStuff() {
System.out.println("Got request for streaming resource; starting delayed response threads");
final List<Future<List<Foo>>> futureFoos = new ArrayList<Future<List<Foo>>>();
futureFoos.add(executorService.submit(new DelayedFoos(2)));
futureFoos.add(executorService.submit(new DelayedFoos(4)));
futureFoos.add(executorService.submit(new DelayedFoos(6)));
return new StreamingOutput() {
public void write(OutputStream output) throws IOException {
for (Future<List<Foo>> futureFoo : futureFoos) {
writePartialOutput(futureFoo, output);
output.write("\n".getBytes());
output.flush();
}
}
};
}
private void writePartialOutput(Future<List<Foo>> futureFoo, OutputStream output) {
try {
List<Foo> foos = futureFoo.get();
System.out.println("Server sending a chunk of XML");
for (Foo foo : foos) {
marshaller.marshal(foo, output);
}
} catch (JAXBException e) {
throw new IllegalStateException("JAXB couldn't marshal. Handle it.", e);
} catch (InterruptedException e) {
throw new IllegalStateException("Task was interrupted. Handle it.", e);
} catch (ExecutionException e) {
throw new IllegalStateException("Task failed to execute. Handle it.", e);
}
}
class DelayedFoos implements Callable<List<Foo>> {
private int delaySeconds;
public DelayedFoos(int delaySeconds) {
this.delaySeconds = delaySeconds;
}
public List<Foo> call() throws Exception {
Thread.sleep(delaySeconds * 1000);
return Arrays.asList(new Foo(fooCounter++), new Foo(fooCounter++), new Foo(fooCounter++));
}
}
public static void main(String[] args) throws IOException {
System.out.println("Starting Grizzly with the JAX-RS resource");
final String baseUri = "http://localhost:9998/";
final Map<String, String> initParams = new HashMap<String, String>();
initParams.put("com.sun.jersey.config.property.packages", "rds.jersey");
SelectorThread threadSelector = GrizzlyWebContainerFactory.create(baseUri, initParams);
System.out.println("Grizzly started");
System.out.println("Starting a thread to request the streamed XML");
executorService.submit(new HttpRequester(baseUri + "streaming"));
}
}
@XmlRootElement
class Foo {
@XmlElement
private int id;
Foo() {}
public Foo(int id) {
this.id = id;
}
}
class HttpRequester implements Runnable {
private String url;
public HttpRequester(String url) {
this.url = url;
}
public void run() {
try {
System.out.println("Doing HTTP GET on " + url);
HttpURLConnection urlConnection = (HttpURLConnection) new URL(url).openConnection();
BufferedReader in = new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
String line;
while ((line = in.readLine()) != null) {
System.out.println("Client got: " + line);
}
System.exit(0);
} catch (IOException e) {
throw new IllegalStateException("Some bad I/O happened. Handle it.", e);
}
}
}
Важные моменты / различия, на которые следует обратить внимание:
- Возвращение ответаиз вашего ресурса метод указывает, что весь ответ содержится в этом объекте и не допускает инкрементных обновлений ответа.Вместо этого верните StreamingOutput.Это говорит Джерси, что вы будете отправлять обратно поток данных, который вы можете добавлять по своему желанию, пока не закончите.StreamingOutput дает вам доступ к OutputStream, который используется для отправки инкрементных обновлений и является ключом ко всему этому.Конечно, это означает, что вы должны справиться с маршалингом самостоятельно.Джерси может выполнять маршалинг только в том случае, если вы возвращаете весь ответ за один раз.
- Поскольку OutputStream - это то, как вы отправляете данные постепенно, вы должны либо выполнять потоки в своем JAX-Ресурс RS или передайте OutputStream вашему DriverController и запишите его туда.
- Обязательно вызовите flush () для OutputStream, если вы хотите заставить его немедленно отправлять данные.В противном случае клиенту ничего не будет отправлено, пока не будет заполнен какой-либо внутренний буфер.Обратите внимание, что вызов flush () самостоятельно обходит цель буфера и делает ваше приложение более разговорчивым.
В общем, чтобы применить это к вашему проекту, главное, что нужно сделать, это изменить метод ресурсовчтобы вернуть реализацию StreamingOutput и вызвать ваш DriverController изнутри этой реализации, передав OutputStream в DriverController.Затем в DriverController, когда вы возвращаете некоторые статьи из потока, вместо того, чтобы добавлять их в очередь на потом, немедленно запишите их в OutputStream.