Source.queue с cachedHostConnectionPool в Java - PullRequest
       24

Source.queue с cachedHostConnectionPool в Java

1 голос
/ 03 августа 2020

Я пытаюсь использовать source.queue с cachedHostConnectionPool в java, но не могу получить правильный вывод. Ниже представлена ​​моя попытка. Я всегда получаю успешный ответ, и я понимаю, почему, но не уверен, как это сделать.

Причина, по которой я получаю успешный ответ, заключается в том, что я определил Try как HttpResponse.create (), который всегда дает мне успешный ответ, но я сомневаюсь, почему он не перезаписывается, когда я делаю queue.offer (Pair.create (httprequest, обещание)). Может кто-нибудь, пожалуйста, помогите мне понять, что я здесь делаю не так.

// Method to return queue 
  public SourceQueueWithComplete<Object> httpGetRequestWithPool
          (Materializer materializer) 
{
    final Http http = Http.get(system);
     final Flow<Pair<HttpRequest, Object>, Pair<Try<HttpResponse>, Object>, HostConnectionPool> flow;
      flow = http.cachedHostConnectionPool(ConnectHttp.toHost("host:port"));
    final SourceQueueWithComplete<Object> queue= Source.queue(3,OverflowStrategy.dropNew())
                 .map(d -> Pair.create(HttpRequest.create("contextpath"),d))
                 .via(flow)
                 .toMat(Sink.foreach(p -> p.first()), Keep.left())
                 .run(materializer);
    return queue;
  }

// Here i am trying to use.

  SourceQueueWithComplete<Object> queue = util.httpGetRequestWithPool(materializer);
        Source<Object, NotUsed> source = Source.from(ListOfObject);
        source.mapAsyncUnordered(3,x-> {
            String url = util.getServiceUrl("servicename").append(x.getId).toString();
            HttpRequest httpRequest = HttpRequest.create(url);
            Try<HttpResponse> promise = Try.apply(()->HttpResponse.create());
            queue.offer(Pair.create(httpRequest,promise))
                    .thenCompose(result -> {
                        if(result instanceof QueueOfferResult.Enqueued$){
                         return CompletableFuture.completedFuture(promise)
                                    .thenApply(res ->{
                                       if(res.get().status().intValue()==200){
                                           System.out.println("success");
                                       }
                                       return res;
                                    });
                        }
                        else{
                            return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE));

                        }
                    });
            return null;
        }).run(materializer);
```
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...