передача потока Akka в вышестоящий сервис для заполнения - PullRequest
9 голосов
/ 05 апреля 2020

Мне нужно вызвать вышестоящую службу (Azure Служба BLOB-объектов), чтобы передать sh данные в OutputStream, а затем мне нужно развернуться и отправить sh обратно клиенту через akka. Без akka (и просто кода сервлета) я просто получил бы ServletOutputStream и передал его методу сервиса azure.

Ближайший, на которого я могу попытаться наткнуться, и, очевидно, это неправильно, это что-то как это

        Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
            blobClient.download(os);
            return os;
        });

        ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

        sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());

Идея в том, что я вызываю вышестоящий сервис для получения выходного потока, заполняемого вызовом blobClient.download (os);

Кажется, что вызывается лямбда-функция и возвращается, но потом не удается, потому что нет данных или чего-то еще. Как будто я не должен иметь эту лямбда-функцию, но возможно вернуть какой-то объект, который работает? Не уверен.

Как это сделать?

Ответы [ 2 ]

2 голосов
/ 28 апреля 2020

Настоящая проблема здесь заключается в том, что API Azure не предназначен для противодавления. Выходной поток не может сообщить обратно Azure, что он не готов для получения дополнительных данных. Иными словами: если Azure отправляет данные быстрее, чем вы можете их использовать, то где-то должен произойти какой-то ужасный сбой переполнения буфера.

Принятие этого факта - следующая лучшая вещь, которую мы можем сделать is:

  • Используйте Source.lazySource, чтобы начать загрузку данных только при наличии потребности в нисходящем направлении (иначе, источник запускается и данные запрашиваются).
  • Поместите download вызвать другой поток, чтобы он продолжал выполняться, не блокируя возвращение источника. Один из способов сделать это - Future (я не уверен, что такое Java лучшие практики, но в любом случае должен работать нормально). Хотя изначально это не имеет значения, вам может потребоваться выбрать контекст выполнения, отличный от system.dispatcher - все зависит от того, блокирует download или нет.

Я заранее извиняюсь, если это Java код искажен - я использую Akka с Scala, так что это все из-за того, что я смотрю на Akka Java API и Java ссылку на синтаксис.

ResponseEntity responseEntity = HttpEntities.create(
  ContentTypes.APPLICATION_OCTET_STREAM,
  preAuthData.getFileSize(),

  // Wait until there is downstream demand to intialize the source...
  Source.lazySource(() -> {
    // Pre-materialize the outputstream before the source starts running
    Pair<OutputStream, Source<ByteString, NotUsed>> pair =
      StreamConverters.asOutputStream().preMaterialize(system);

    // Start writing into the download stream in a separate thread
    Futures.future(() -> { blobClient.download(pair.first()); return pair.first(); }, system.getDispatcher());

    // Return the source - it should start running since `lazySource` indicated demand
    return pair.second();
  })
);

sender().tell(new RequestResult(responseEntity, StatusCodes.OK), self());
1 голос
/ 15 апреля 2020

OutputStream в этом случае является «материализованным значением» Source, и оно будет создано только после запуска потока (или «материализации» в текущий поток). Запуск его вне вашего контроля, так как вы передаете Source в Akka HTTP, и это позже фактически запустит ваш источник.

.mapMaterializedValue(matval -> ...) обычно используется для преобразования материализованного значения, но так как оно вызывается как В качестве части материализации вы можете использовать это для создания побочных эффектов, таких как отправка матвала в сообщении, как вы уже поняли, в этом нет ничего плохого, даже если это выглядит странно. Важно понимать, что поток не завершит свою материализацию и начнет работать до тех пор, пока лямбда не завершится. Это означает проблемы, если download() блокирует, а не отбрасывает какую-то работу в другом потоке и немедленно возвращает ее.

Однако существует другое решение: Source.preMaterialize(), оно материализует источник и дает вам Pair материализованного значения и нового Source, который может использоваться для использования уже запущенного источника:

Pair<OutputStream, Source<ByteString, NotUsed>> pair = 
  StreamConverters.asOutputStream().preMaterialize(system);
OutputStream os = pair.first();
Source<ByteString, NotUsed> source = pair.second();

Обратите внимание, что в вашем коде есть несколько дополнительных вещей, которые следует учитывать, особенно если blobClient.download(os) вызывайте блоки до тех пор, пока это не будет сделано, и вы вызываете это от актера, в этом случае вы должны убедиться, что ваш актер не морит голодом диспетчер и не останавливать выполнение других акторов в вашем приложении (см. Документы Akka: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking -needs-осторожно-управление ).

...