Поскольку у меня возникли проблемы с обработкой потока OpenJV mjpeg, например, для камер Logitech, я хотел бы создать обработчик потока mjpeg, используя rx java. Цель состоит в том, чтобы создать Image Observer, который подается из URL-адреса mjpeg. Ниже приведено текущее состояние кода.
Существует несколько проблем, для которых я хотел бы получить решения / ответы:
- Почему кажется, что StringObserable возвращает всегда один и тот же первый кадр вместо того, чтобы двигаться в исходном входном потоке в каждой подписке onNext?
- Как подписчик может быть преобразован в излучатель для изображений JPeg?
С этими двумя ответами Я полагаю, это может быть хорошим решением. Есть еще мысли?
Модульный тест
@Test
public void testMJpegStream() throws Exception {
// Dorf Appenzell
String url="http://213.193.89.202/axis-cgi/mjpg/video.cgi";
//url="http://localhost:8081?type=simulator&mode=stream";
//url="http://picarford:8080/?action=stream";
MJpegHandler mjpegHandler=new MJpegHandler(url);
MJpegDecoder.debug=true;
int bufferSize = 1024 * 64; // 64 KByte Buffer
MJpegDecoder mjpegDecoder = mjpegHandler.open(bufferSize);
Thread.sleep(1000);
mjpegDecoder.close();
}
MJpegHandler
package org.rcdukes.imageview;
import static org.asynchttpclient.Dsl.asyncHttpClient;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.Future;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Response;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;
public class MJpegHandler {
AsyncHttpClient asyncHttpClient;
private PipedInputStream pipedInputStream;
private PipedOutputStream pipedOutputStream;
private BodyDeferringAsyncHandler outputHandler;
BodyDeferringInputStream inputStream;
public BodyDeferringInputStream getInputStream() {
return inputStream;
}
/**
* get an mjpeg stream from the given url
*
* @param url
* @return - the MJPeg Stream
* @throws Exception
*/
public MJpegHandler(String url) throws Exception {
// https://stackoverflow.com/a/50402629/1497139
asyncHttpClient = asyncHttpClient();
asyncHttpClient.prepareGet(url);
pipedInputStream = new PipedInputStream();
pipedOutputStream = new PipedOutputStream(
pipedInputStream);
outputHandler = new BodyDeferringAsyncHandler(
pipedOutputStream);
Future<Response> futureResponse = asyncHttpClient.prepareGet(url)
.execute(outputHandler);
Response response = outputHandler.getResponse();
if (response.getStatusCode() == 200) {
inputStream=new BodyDeferringAsyncHandler.BodyDeferringInputStream(
futureResponse, outputHandler, pipedInputStream);
}
}
public MJpegHandler() {
}
/**
* open me with the given bufferSize
*
* @param url
* @return
* @throws Exception
*/
public MJpegDecoder open(int bufferSize) throws Exception {
MJpegDecoder mjpegDecoder = new MJpegDecoder(this);
mjpegDecoder.open(bufferSize);
return mjpegDecoder;
}
/**
* close this handler
* @throws IOException
*/
public void close() throws IOException {
this.asyncHttpClient.close();
}
}
MJpegDecoder
package org.rcdukes.imageview;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.observables.StringObservable;
import rx.schedulers.Schedulers;
/**
* reactive MJPegDecoder
*
* @author wf
*
*/
public class MJpegDecoder extends Subscriber<byte[]> {
protected static final Logger LOG = LoggerFactory
.getLogger(MJpegDecoder.class);
int prev = 0;
int cur = 0;
private ByteArrayOutputStream jpgOut;
private byte[] curFrame;
public static boolean debug = false;
private int bufferIndex = 0;
private int frameIndex = 0;
FileOutputStream fos;
private int bufferSize;
private Observable<byte[]> mjpegSubscription;
private MJpegHandler mjpegHandler;
/**
* open the decoder for the given stream
*
* @param mJpegHandler
* @param bufferSize
*/
public MJpegDecoder(MJpegHandler mJpegHandler) {
this.mjpegHandler = mJpegHandler;
this.curFrame = new byte[0];
if (debug) {
try {
fos = new FileOutputStream("/tmp/decoder.mjpg");
} catch (FileNotFoundException e) {
handle(e);
}
}
}
@Override
public void onCompleted() {
try {
this.mjpegHandler.close();
if (fos != null) {
fos.close();
}
fos = null;
} catch (IOException e) {
onError(e);
}
}
@Override
public void onError(Throwable e) {
handle(e);
}
@Override
public void onNext(byte[] buffer) {
if (debug) {
String msg = String.format("buffer %6d available %9d kB read",
++bufferIndex, bufferIndex * bufferSize / 1024);
LOG.info(msg);
try {
fos.write(curFrame);
fos.flush();
} catch (IOException e) {
handle(e);
}
}
// loop over all bytes in the buffer
for (int cur : buffer) {
// Content-Type: multipart/x-mixed-replace; boundary=
// will have -- we could detect it here
if (debug) {
if (prev == 0x2D && cur == 0x2D) {
LOG.info("boundary detected");
}
}
// check for JPEG start bytes
if (prev == 0xFFFFFFFF && cur == 0xFFFFFFD8) {
if (debug) {
String msg = String.format("frame %6d started", frameIndex + 1);
LOG.info(msg);
}
jpgOut = new ByteArrayOutputStream(bufferSize);
// first byte needs to be written to output
jpgOut.write((byte) prev);
}
// if within the frame write all bytes
if (jpgOut != null) {
jpgOut.write((byte) cur);
// check for JPEG end bytes
// if found the frame is finished
if (prev == 0xFFFFFFFF && cur == 0xFFFFFFD9) {
// create the byte array of the current jpeg frame
curFrame = jpgOut.toByteArray();
try {
jpgOut.close();
jpgOut = null;
} catch (IOException e) {
onError(e);
}
if (debug) {
String msg = String.format("frame %6d available", ++frameIndex);
LOG.info(msg);
}
// emit the current frame
}
}
prev = cur;
}
}
private void handle(Throwable th) {
LOG.error(th.getMessage());
if (debug)
th.printStackTrace();
}
/**
* open me with the given bufferSize
*
* @param bufferSize
* e.g. 64 KByte Buffer - 10.5 msecs/100 FPS at 1920x1080
* 1000/(1920*1080*3/1024/64)
*/
public void open(int bufferSize) {
this.bufferSize = bufferSize;
BodyDeferringInputStream inputStream = this.mjpegHandler.getInputStream();
if (inputStream != null) {
mjpegSubscription = StringObservable.from(inputStream, bufferSize)
.subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread());
mjpegSubscription.subscribe(this);
}
}
/**
* close me
*/
public void close() {
this.unsubscribe();
this.onCompleted();
}
}