Как добавить пакетную неявную для клиента? - PullRequest
7 голосов
/ 02 ноября 2019

Давайте рассмотрим следующий код:

Код клиента:

public class MyClient {
    private final MyClientSideService myClientSideService;

    public MyClient(MyClientSideService myClientSideService) {
        this.myClientSideService = myClientSideService;
    }

    public String requestRow(Integer req) {
        return myClientSideService.requestSingleRow(req);
    }
}

Обслуживание на стороне клиента:

public class MyClientSideService {
    private final MyServerSideService myServerSideService;

    public MyClientSideService(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        return myServerSideService.requestRowBatch(Arrays.asList(req)).get(0);
    }
}

Служба на стороне сервера:

@Slf4j
public class MyServerSideService {
    //single threaded bottleneck service
    public synchronized List<String> requestRowBatch(List<Integer> batchReq) {
        log.info("Req for {} started");
        try {
            Thread.sleep(100);
            return batchReq.stream().map(String::valueOf).collect(Collectors.toList());

        } catch (InterruptedException e) {
            return null;
        } finally {
            log.info("Req for {} finished");

        }
    }
}

И основной:

@Slf4j
public class MainClass {
    public static void main(String[] args) {
        MyClient myClient = new MyClient(new MyClientSideService(new MyServerSideService()));
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int m = 0; m < 100; m++) {
                    int k = m;
                    log.info("Response is {}", myClient.requestRow(k));
                }
            }).start();
        }
    }
}

Согласно журналам это занимает приблизительно 4 мин 22 сек, но этоперебор. Я думаю, что это может быть значительно улучшено. Я хотел бы реализовать неявное пакетирование. Таким образом, MyClientSideService должен собирать запросы, и когда он становится равным 50 (это предварительно сконфигурированный размер пакета) или истек некоторый предварительно сконфигурированный тайм-аут, то для запроса MyServerSideService и обратного результата маршрутизации клиентам. Протокол должен быть синхронным, поэтому клиенты должны быть заблокированы до получения результата.

Я пытался написать код, используя CountDownLatch es и CyclicBarrier s, но мои попытки были далеки от успеха.

Как мне достичь своей цели?

PS

Если заменить requestRowBatch тип возврата List<String> с на Map<Integer, String>, чтобы делегировать запрос и сопоставление ответа на серверследующие работы с ограничениями. Это работает, только если я отправляю <= 25 запросов </p>

@Slf4j
public class MyClientSideService {
    private final Integer batchSize = 25;
    private final Integer maxTimeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final Queue<Integer> queue = new ArrayBlockingQueue(batchSize);
    private final Map<Integer, String> responseMap = new ConcurrentHashMap();
    private final AtomicBoolean started = new AtomicBoolean();

    private CountDownLatch startBatchRequestLatch = new CountDownLatch(batchSize);
    private CountDownLatch awaitBatchResponseLatch = new CountDownLatch(1);


    public MyClientSideService(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        queue.offer(req);
        if (!started.compareAndExchange(false, true)) {
            log.info("Start batch collecting");
            startBatchCollecting();
        }
        startBatchRequestLatch.countDown();
        try {
            log.info("Awaiting batch response latch for {}...", req);
            awaitBatchResponseLatch.await();
            log.info("Finished awaiting batch response latch for {}...", req);
            return responseMap.get(req);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return "EXCEPTION";
        }
    }

    private void startBatchCollecting() {
        new Thread(() -> {
            try {
                log.info("Await startBatchRequestLatch");
                startBatchRequestLatch.await(maxTimeoutMillis, TimeUnit.MILLISECONDS);
                log.info("await of startBatchRequestLatch finished");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            responseMap.putAll(requestBatch(queue));
            log.info("Releasing batch response latch");
            awaitBatchResponseLatch.countDown();

        }).start();
    }

    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {

        return myServerSideService.requestRowBatch(requestList);
    }
}

Обновление

В соответствии с ответом на солод я смог разработать следующее:

@Slf4j
public class MyClientSideServiceCompletableFuture {
    private final Integer batchSize = 25;
    private final Integer maxTimeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final Queue<Pair<Integer, CompletableFuture>> queue = new ArrayBlockingQueue(batchSize);
    private final AtomicInteger counter = new AtomicInteger(0);
    private final Lock lock = new ReentrantLock();

    public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
    }

    public String requestSingleRow(int req) {
        CompletableFuture<String> future = new CompletableFuture<>();
        lock.lock();
        try {
            queue.offer(Pair.of(req, future));
            int counter = this.counter.incrementAndGet();
            if (counter != 0 && counter % batchSize == 0) {
                log.info("request");
                List<Integer> requests = queue.stream().map(p -> p.getKey()).collect(Collectors.toList());
                Map<Integer, String> serverResponseMap = requestBatch(requests);
                queue.forEach(pair -> {
                    String response = serverResponseMap.get(pair.getKey());
                    CompletableFuture<String> value = pair.getValue();
                    value.complete(response);
                });
                queue.clear();
            }
        } finally {
            lock.unlock();
        }
        try {
            return future.get();
        } catch (Exception e) {
            return "Exception";
        }
    }


    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {

        return myServerSideService.requestRowBatch(requestList);
    }
}

Но это не такработать, если размер не кратен размеру партии

Ответы [ 3 ]

0 голосов
/ 06 ноября 2019

Если заменить тип возврата requestRowBatch с List<String> на Map<Integer, String> для делегирования запроса и отображения ответа на сервер, я смог создать следующее решение:

@Slf4j
public class MyClientSideServiceCompletableFuture {
    private final Integer batchSize = 25;
    private final Integer timeoutMillis = 5000;
    private final MyServerSideService myServerSideService;
    private final BlockingQueue<Pair<Integer, CompletableFuture>> queue = new LinkedBlockingQueue<>();

    private final Lock lock = new ReentrantLock();
    private final Condition requestAddedCondition = lock.newCondition();


    public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
        this.myServerSideService = myServerSideService;
        startQueueDrainer();
    }

    public String requestSingleRow(int req) {
        CompletableFuture<String> future = new CompletableFuture<>();
        while (!queue.offer(Pair.of(req, future))) {
            log.error("Can't add {} to the queue. Retrying...", req);
        }
        lock.lock();
        try {
            requestAddedCondition.signal();
        } finally {
            lock.unlock();
        }
        try {
            return future.get();
        } catch (Exception e) {
            return "Exception";
        }
    }

    private void startQueueDrainer() {
        new Thread(() -> {
            log.info("request");
            while (true) {
                ArrayList<Pair<Integer, CompletableFuture>> requests = new ArrayList<>();
                if (queue.drainTo(requests, batchSize) > 0) {
                    log.info("drained {} items", requests.size());
                    Map<Integer, String> serverResponseMap = requestBatch(requests.stream().map(Pair::getKey).collect(Collectors.toList()));
                    requests.forEach(pair -> {
                        String response = serverResponseMap.get(pair.getKey());
                        CompletableFuture<String> value = pair.getValue();
                        value.complete(response);
                    });
                } else {
                    lock.lock();
                    try {
                        while (queue.size() == 0) {
                            try {
                                log.info("Waiting on condition");
                                requestAddedCondition.await(timeoutMillis, TimeUnit.MILLISECONDS);
                                log.info("Waking up on condition");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    } finally {
                        lock.unlock();
                    }
                }

            }
        }).start();
    }


    public Map<Integer, String> requestBatch(Collection<Integer> requestList) {
        return myServerSideService.requestRowBatch(requestList);
    }
}

Это выглядит как рабочее решение. Но я не уверен, что это оптимально.

0 голосов
/ 07 ноября 2019

Ваше решение MyClientSideServiceCompletableFuture будет отправлять запросы на сервер каждый раз, когда вы добавляете что-то в очередь, и не ждет, пока запросы будут иметь размер пакета. Вы используете BlockingQueue и добавляете ненужные условия блокировки и блокировки. BlockingQueue имеет ограничения по тайм-ауту блокировки, поэтому нет необходимости в дополнительном условии.

Вы можете упростить свое решение, например так:

Он отправляет запросы на сервер только тогда, когда пакет заполнен или время ожидания истекло и пакетне является пустым.

private void startQueueDrainer() {

        new Thread(() -> {
            log.info("request");
            ArrayList<Pair<Integer, CompletableFuture>> batch = new ArrayList<>(batchSize);
            while (true) {
                try {
                    batch.clear(); //clear batch
                    long timeTowWait = timeoutMillis;
                    long startTime = System.currentTimeMillis();

                    while (timeTowWait > 0 && batch.size() < batchSize) {
                        Pair<Integer, CompletableFuture> request = queue.poll(timeTowWait , TimeUnit.MILLISECONDS);
                        if(request != null){
                          batch.add(request);
                        }
                        long timeSpent = (System.currentTimeMillis() - startTime);
                        timeTowWait = timeTowWait - timeSpent;
                    }

                    if (!batch.isEmpty()) {
                        // we wait at least timeoutMillis or batch is full
                        log.info("send {} requests to server", batch.size());
                        Map<Integer, String> serverResponseMap = requestBatch(batch.stream().map(Pair::getKey).collect(Collectors.toList()));
                        batch.forEach(pair -> {
                            String response = serverResponseMap.get(pair.getKey());
                            CompletableFuture<String> value = pair.getValue();
                            value.complete(response);
                        });
                    } else {
                        log.info("We wait {} but the batch is still empty", System.currentTimeMillis() - startTime);
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

Измените метод requestSingleRow, чтобы не использовать блокировку

  public String requestSingleRow(int req) {
        CompletableFuture<String> future = new CompletableFuture<>();

        while (!queue.offer(Pair.of(req, future))) {
            log.error("Can't add {} to the queue. Retrying...", req);
        }

        try {
            return future.get();
        } catch (Exception e) {
            return "Exception";
        }
    }
0 голосов
/ 02 ноября 2019

Вы можете использовать CompletableFuture.
Потоки, вызывающие MyClientSideService, помещают свои запросы в Queue (возможно, BlockingQueue, и получают взамен новое CompletableFuture. Вызывающий поток может вызвать CompletableFuture.get() блокировать, пока результат не будет готов, или продолжать выполнять другие действия.

что CompletableFuture будет храниться вместе с запросом в MyClientSideService. Когда вы достигнете 50 запросов (и, следовательно, 50 CompletableFuture)экземпляры), чтобы клиентская служба отправила пакетный запрос.

Когда запрос будет завершен, используйте метод CompletableFuture.complete(value) каждого экземпляра ComplatableFuture в очереди, чтобы уведомить поток клиента о готовности ответа. Это разблокирует клиент, если он вызвал метод блокировки, такой как CompletableFuture.get(), или заставит его немедленно вернуться со значением, если он будет вызван позже.

...