mjpegdecoder на основе rx java не работает должным образом - PullRequest
0 голосов
/ 24 января 2020

Поскольку у меня возникли проблемы с обработкой потока OpenJV mjpeg, например, для камер Logitech, я хотел бы создать обработчик потока mjpeg, используя rx java. Цель состоит в том, чтобы создать Image Observer, который подается из URL-адреса mjpeg. Ниже приведено текущее состояние кода.

Существует несколько проблем, для которых я хотел бы получить решения / ответы:

  1. Почему кажется, что StringObserable возвращает всегда один и тот же первый кадр вместо того, чтобы двигаться в исходном входном потоке в каждой подписке onNext?
  2. Как подписчик может быть преобразован в излучатель для изображений JPeg?

С этими двумя ответами Я полагаю, это может быть хорошим решением. Есть еще мысли?

Модульный тест

  @Test
  public void testMJpegStream() throws Exception {
    // Dorf Appenzell
    String url="http://213.193.89.202/axis-cgi/mjpg/video.cgi";
    //url="http://localhost:8081?type=simulator&mode=stream";
    //url="http://picarford:8080/?action=stream";
    MJpegHandler mjpegHandler=new MJpegHandler(url);
    MJpegDecoder.debug=true;
    int bufferSize = 1024 * 64; // 64 KByte Buffer
    MJpegDecoder mjpegDecoder = mjpegHandler.open(bufferSize);
    Thread.sleep(1000);
    mjpegDecoder.close();
  }

MJpegHandler

package org.rcdukes.imageview;

import static org.asynchttpclient.Dsl.asyncHttpClient;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.Future;

import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Response;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler;
import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;

public class MJpegHandler {

  AsyncHttpClient asyncHttpClient;
  private PipedInputStream pipedInputStream;
  private PipedOutputStream pipedOutputStream;
  private BodyDeferringAsyncHandler outputHandler;
  BodyDeferringInputStream inputStream;

  public BodyDeferringInputStream getInputStream() {
    return inputStream;
  }

  /**
   * get an mjpeg stream from the given url
   * 
   * @param url
   * @return - the MJPeg Stream
   * @throws Exception
   */
  public MJpegHandler(String url) throws Exception {
    // https://stackoverflow.com/a/50402629/1497139
    asyncHttpClient = asyncHttpClient();
    asyncHttpClient.prepareGet(url);
    pipedInputStream = new PipedInputStream();
    pipedOutputStream = new PipedOutputStream(
        pipedInputStream);
    outputHandler = new BodyDeferringAsyncHandler(
        pipedOutputStream);
    Future<Response> futureResponse = asyncHttpClient.prepareGet(url)
        .execute(outputHandler);
    Response response = outputHandler.getResponse();
    if (response.getStatusCode() == 200) {
      inputStream=new BodyDeferringAsyncHandler.BodyDeferringInputStream(
          futureResponse, outputHandler, pipedInputStream);
    } 
  }

  public MJpegHandler() {
  }

  /**
   * open me with the given bufferSize
   * 
   * @param url
   * @return
   * @throws Exception
   */
  public MJpegDecoder open(int bufferSize) throws Exception {
    MJpegDecoder mjpegDecoder = new MJpegDecoder(this);
    mjpegDecoder.open(bufferSize);
    return mjpegDecoder;
  }

  /**
   * close this handler
   * @throws IOException
   */
  public void close() throws IOException {
     this.asyncHttpClient.close();
  }

}

MJpegDecoder

package org.rcdukes.imageview;

import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Subscriber;
import rx.observables.StringObservable;
import rx.schedulers.Schedulers;

/**
 * reactive MJPegDecoder
 * 
 * @author wf
 *
 */
public class MJpegDecoder extends Subscriber<byte[]> {
  protected static final Logger LOG = LoggerFactory
      .getLogger(MJpegDecoder.class);

  int prev = 0;
  int cur = 0;
  private ByteArrayOutputStream jpgOut;
  private byte[] curFrame;
  public static boolean debug = false;
  private int bufferIndex = 0;
  private int frameIndex = 0;
  FileOutputStream fos;

  private int bufferSize;

  private Observable<byte[]> mjpegSubscription;

  private MJpegHandler mjpegHandler;

  /**
   * open the decoder for the given stream
   * 
   * @param mJpegHandler
   * @param bufferSize
   */
  public MJpegDecoder(MJpegHandler mJpegHandler) {
    this.mjpegHandler = mJpegHandler;
    this.curFrame = new byte[0];
    if (debug) {
      try {
        fos = new FileOutputStream("/tmp/decoder.mjpg");
      } catch (FileNotFoundException e) {
        handle(e);
      }
    }
  }

  @Override
  public void onCompleted() {
    try {
      this.mjpegHandler.close();
      if (fos != null) {
        fos.close();
      }
      fos = null;
    } catch (IOException e) {
      onError(e);
    }
  }

  @Override
  public void onError(Throwable e) {
    handle(e);
  }

  @Override
  public void onNext(byte[] buffer) {
    if (debug) {
      String msg = String.format("buffer %6d available %9d kB read",
          ++bufferIndex, bufferIndex * bufferSize / 1024);
      LOG.info(msg);
      try {
        fos.write(curFrame);
        fos.flush();
      } catch (IOException e) {
        handle(e);
      }

    }
    // loop over all bytes in the buffer
    for (int cur : buffer) {
      // Content-Type: multipart/x-mixed-replace; boundary=
      // will have -- we could detect it here
      if (debug) {
        if (prev == 0x2D && cur == 0x2D) {
          LOG.info("boundary detected");
        }
      }
      // check for JPEG start bytes
      if (prev == 0xFFFFFFFF && cur == 0xFFFFFFD8) {
        if (debug) {
          String msg = String.format("frame %6d started", frameIndex + 1);
          LOG.info(msg);
        }
        jpgOut = new ByteArrayOutputStream(bufferSize);
        // first byte needs to be written to output
        jpgOut.write((byte) prev);
      }
      // if within the frame write all bytes
      if (jpgOut != null) {
        jpgOut.write((byte) cur);
        // check for JPEG end bytes
        // if found the frame is finished
        if (prev == 0xFFFFFFFF && cur == 0xFFFFFFD9) {
          // create the byte array of the current jpeg frame
          curFrame = jpgOut.toByteArray();
          try {
            jpgOut.close();
            jpgOut = null;
          } catch (IOException e) {
            onError(e);
          }

          if (debug) {
            String msg = String.format("frame %6d available", ++frameIndex);
            LOG.info(msg);
          }

          // emit the current frame
        }
      }
      prev = cur;
    }
  }

  private void handle(Throwable th) {
    LOG.error(th.getMessage());
    if (debug)
      th.printStackTrace();
  }

  /**
   * open me with the given bufferSize
   * 
   * @param bufferSize
   *          e.g. 64 KByte Buffer - 10.5 msecs/100 FPS at 1920x1080
   *          1000/(1920*1080*3/1024/64)
   */
  public void open(int bufferSize) {
    this.bufferSize = bufferSize;
    BodyDeferringInputStream inputStream = this.mjpegHandler.getInputStream();
    if (inputStream != null) {
      mjpegSubscription = StringObservable.from(inputStream, bufferSize)
          .subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread());
      mjpegSubscription.subscribe(this);
    }
  }

  /**
   * close me
   */
  public void close() {
    this.unsubscribe();
    this.onCompleted();
  }

}
...