mjpegdecoder на основе rx java не работает должным образом - PullRequest
0 голосов
/ 24 января 2020

Поскольку у меня возникли проблемы с обработкой потока OpenJV mjpeg, например, для камер Logitech, я хотел бы создать обработчик потока mjpeg, используя rx java. Цель состоит в том, чтобы создать Image Observer, который подается из URL-адреса mjpeg. Ниже приведено текущее состояние кода.

Существует несколько проблем, для которых я хотел бы получить решения / ответы:

  1. Почему кажется, что StringObserable возвращает всегда один и тот же первый кадр вместо того, чтобы двигаться в исходном входном потоке в каждой подписке onNext?
  2. Как подписчик может быть преобразован в излучатель для изображений JPeg?

С этими двумя ответами Я полагаю, это может быть хорошим решением. Есть еще мысли?

Модульный тест

  public void testMJpegStream() throws Exception {
    // Dorf Appenzell
    String url="";
    MJpegHandler mjpegHandler=new MJpegHandler(url);
    int bufferSize = 1024 * 64; // 64 KByte Buffer
    MJpegDecoder mjpegDecoder = mjpegHandler.open(bufferSize);


package org.rcdukes.imageview;

import static org.asynchttpclient.Dsl.asyncHttpClient;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.Future;

import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Response;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;

public class MJpegHandler {

  AsyncHttpClient asyncHttpClient;
  private PipedInputStream pipedInputStream;
  private PipedOutputStream pipedOutputStream;
  private BodyDeferringAsyncHandler outputHandler;
  BodyDeferringInputStream inputStream;

  public BodyDeferringInputStream getInputStream() {
    return inputStream;

   * get an mjpeg stream from the given url
   * @param url
   * @return - the MJPeg Stream
   * @throws Exception
  public MJpegHandler(String url) throws Exception {
    // https://stackoverflow.com/a/50402629/1497139
    asyncHttpClient = asyncHttpClient();
    pipedInputStream = new PipedInputStream();
    pipedOutputStream = new PipedOutputStream(
    outputHandler = new BodyDeferringAsyncHandler(
    Future<Response> futureResponse = asyncHttpClient.prepareGet(url)
    Response response = outputHandler.getResponse();
    if (response.getStatusCode() == 200) {
      inputStream=new BodyDeferringAsyncHandler.BodyDeferringInputStream(
          futureResponse, outputHandler, pipedInputStream);

  public MJpegHandler() {

   * open me with the given bufferSize
   * @param url
   * @return
   * @throws Exception
  public MJpegDecoder open(int bufferSize) throws Exception {
    MJpegDecoder mjpegDecoder = new MJpegDecoder(this);
    return mjpegDecoder;

   * close this handler
   * @throws IOException
  public void close() throws IOException {



package org.rcdukes.imageview;

import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Subscriber;
import rx.observables.StringObservable;
import rx.schedulers.Schedulers;

 * reactive MJPegDecoder
 * @author wf
public class MJpegDecoder extends Subscriber<byte[]> {
  protected static final Logger LOG = LoggerFactory

  int prev = 0;
  int cur = 0;
  private ByteArrayOutputStream jpgOut;
  private byte[] curFrame;
  public static boolean debug = false;
  private int bufferIndex = 0;
  private int frameIndex = 0;
  FileOutputStream fos;

  private int bufferSize;

  private Observable<byte[]> mjpegSubscription;

  private MJpegHandler mjpegHandler;

   * open the decoder for the given stream
   * @param mJpegHandler
   * @param bufferSize
  public MJpegDecoder(MJpegHandler mJpegHandler) {
    this.mjpegHandler = mJpegHandler;
    this.curFrame = new byte[0];
    if (debug) {
      try {
        fos = new FileOutputStream("/tmp/decoder.mjpg");
      } catch (FileNotFoundException e) {

  public void onCompleted() {
    try {
      if (fos != null) {
      fos = null;
    } catch (IOException e) {

  public void onError(Throwable e) {

  public void onNext(byte[] buffer) {
    if (debug) {
      String msg = String.format("buffer %6d available %9d kB read",
          ++bufferIndex, bufferIndex * bufferSize / 1024);
      try {
      } catch (IOException e) {

    // loop over all bytes in the buffer
    for (int cur : buffer) {
      // Content-Type: multipart/x-mixed-replace; boundary=
      // will have -- we could detect it here
      if (debug) {
        if (prev == 0x2D && cur == 0x2D) {
          LOG.info("boundary detected");
      // check for JPEG start bytes
      if (prev == 0xFFFFFFFF && cur == 0xFFFFFFD8) {
        if (debug) {
          String msg = String.format("frame %6d started", frameIndex + 1);
        jpgOut = new ByteArrayOutputStream(bufferSize);
        // first byte needs to be written to output
        jpgOut.write((byte) prev);
      // if within the frame write all bytes
      if (jpgOut != null) {
        jpgOut.write((byte) cur);
        // check for JPEG end bytes
        // if found the frame is finished
        if (prev == 0xFFFFFFFF && cur == 0xFFFFFFD9) {
          // create the byte array of the current jpeg frame
          curFrame = jpgOut.toByteArray();
          try {
            jpgOut = null;
          } catch (IOException e) {

          if (debug) {
            String msg = String.format("frame %6d available", ++frameIndex);

          // emit the current frame
      prev = cur;

  private void handle(Throwable th) {
    if (debug)

   * open me with the given bufferSize
   * @param bufferSize
   *          e.g. 64 KByte Buffer - 10.5 msecs/100 FPS at 1920x1080
   *          1000/(1920*1080*3/1024/64)
  public void open(int bufferSize) {
    this.bufferSize = bufferSize;
    BodyDeferringInputStream inputStream = this.mjpegHandler.getInputStream();
    if (inputStream != null) {
      mjpegSubscription = StringObservable.from(inputStream, bufferSize)

   * close me
  public void close() {
