Как получать фрагментированные ответы в HTTP / 1.1 при одновременной отправке данных на сервер в Java / Android - PullRequest
2 голосов
/ 15 февраля 2020

Мы создаем приложение, в котором оно записывает голос пользователя в режиме реального времени и отправляет записанные данные на сервер через HTTP-запрос. В то время как сервер обрабатывает данные в реальном времени, он также отправляет ответы в виде чанков. Проще говоря, приложение отправляет данные по частям на сервер, и в то же время оно также получает по частям ответы от сервера.

Пожалуйста, НЕ говорите мне, что это невозможно потому что у меня есть рабочий пример в iOS, который использует URLSession с uploadTask, используя пару потоков для отправки данных в режиме реального времени на сервер, а затем получает ответы по частям от этого обратного вызова urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data).

Ниже мой код в Java. Я получил отправку, но ответ получен только после завершения отправки.

RequestBody body = new RequestBody() 
{
    @Override 
    public MediaType contentType() 
    {
        return MediaType.get("application/octet-stream");
    }

    @Override 
    public void writeTo(BufferedSink sink) throws IOException 
    {
        String filename = "/path/spoken.pcm";

        try 
        {
            InputStream inputStream = new DataInputStream(new FileInputStream(new File(filename)));

            byte[] cacheBytes = new byte[320];

            int length;

            while((length = inputStream.read(cacheBytes, 0, cacheBytes.length)) != -1) 
            {
                System.out.println("write thread name: " + Thread.currentThread());

                sink.write(cacheBytes, 0, length);

                Thread.sleep(10);
            }

            inputStream.close();
        } 

        catch(IOException | InterruptedException e) 
        {
            e.printStackTrace();
        }
    }
};

Request request = new Request.Builder()
            .url("www.server.com")
            .post(body)
            .build();

Interceptor interceptor = new Interceptor()
{
    @Override
    public Response intercept(Chain chain) throws IOException 
    {
        System.out.println("intercept!!!");

        CountDownLatch latch = new CountDownLatch(1);

        Response response = chain.proceed(chain.request());
        BufferedSource source = response.body().source();

        System.out.println("got response body !!!!");

        new Thread(new Runnable()
        {
            @Override
            public void run() 
            {
                byte[] cachedBytes = new byte[512];

                try 
                {
                    while(!source.exhausted())
                    {
                        int length = source.read(cachedBytes);
                        byte[] partialBytes = new byte[length];

                        System.arraycopy(cachedBytes, 0, partialBytes, 0, length);

                        System.out.println("partial response received: " + getHexString(partialBytes));
                    }
                } 

                catch (IOException e) 
                {
                    e.printStackTrace();
                }

                latch.countDown();
            }
        }).start();

        try 
        {
            latch.await();
        } 

        catch (InterruptedException e) 
        {
            e.printStackTrace();
        }

        return response;
    }
};

httpClient = new OkHttpClient.Builder()
            .addInterceptor(interceptor)
            .build();

httpClient.newCall(request).enqueue(new Callback() 
{
    @Override
    public void onFailure(Call call, IOException e) 
    {
        e.printStackTrace();
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException 
    {
        try(ResponseBody responseBody = response.body()) 
        {
            if(!response.isSuccessful()) throw new IOException("Unexpected code " + response);

            System.out.println("all responses received!");
        }
    }
});

Этот журнал: System.out.println("got response body !!!!"); отображается только после завершения отправки всех данных на сервер. Это означает, что когда writeTo(BufferedSink sink) возвращается, я получаю ответ в обратном вызове перехватчика порциями, а затем вызывается обратный вызов onResponse(Call call, Response response).

Что мне нужно, так это то, что когда я отправляю данные, я хочу иметь возможность получать чанкованные ответы одновременно.

Ответы [ 3 ]

3 голосов
/ 15 февраля 2020

Для HTTP / 1.1 OkHttp не вернет тело ответа, пока тело запроса не завершит передачу. Для HTTP / 2 вы можете переопределить isDuplex () , чтобы вернуть true.

0 голосов
/ 05 марта 2020

ОК, глядя везде, эта реализация не является стандартным способом, как другие говорят о HTTP. Хотя HTTP не ограничивает вас от отправки и получения одновременно в одном запросе, поскольку я уже достиг этого, используя приложение iOS в качестве клиента с сервлетом Java HTTP, проблема в том, что большинство библиотек не поддерживают эту функцию. поведение. Вы можете написать свой собственный SDK для этого на стороне клиента, но вы также должны учитывать риск в будущем, если вы планируете использовать балансировщики нагрузки и обратные прокси-серверы для своего сервера, и они могут также не поддерживать это поведение.

Итак, мое самое быстрое решение для достижения того же эффекта - использовать HTTP для отправки запроса, но ответ будет отправлен клиенту через MQTT.

0 голосов
/ 26 февраля 2020

Вы можете попробовать реализовать сокет, который будет оставаться открытым и продолжать слушать, пока приложение не закроется. Таким образом, ответы от вашего сервера могут влить, и вам просто нужно проанализировать его и получить то, что вам нужно. Java Сокет позволит вам сделать такую ​​реализацию. Java Сокет позволяет вам независимо обрабатывать входной поток и выходной поток. В вашем случае входной поток останется открытым навсегда.

Проблема с вышеприведенным решением заключается в том, что вы отправляете запрос на сервер. Запрос завершается с помощью обратного вызова, как только сервер отвечает (200 OK, 401 ... et c). Имейте в виду, что более поздний механизм c является асинхронным и не имеет состояния.

...