Давайте рассмотрим следующий код:
Код клиента:
public class MyClient {
private final MyClientSideService myClientSideService;
public MyClient(MyClientSideService myClientSideService) {
this.myClientSideService = myClientSideService;
}
public String requestRow(Integer req) {
return myClientSideService.requestSingleRow(req);
}
}
Обслуживание на стороне клиента:
public class MyClientSideService {
private final MyServerSideService myServerSideService;
public MyClientSideService(MyServerSideService myServerSideService) {
this.myServerSideService = myServerSideService;
}
public String requestSingleRow(int req) {
return myServerSideService.requestRowBatch(Arrays.asList(req)).get(0);
}
}
Служба на стороне сервера:
@Slf4j
public class MyServerSideService {
//single threaded bottleneck service
public synchronized List<String> requestRowBatch(List<Integer> batchReq) {
log.info("Req for {} started");
try {
Thread.sleep(100);
return batchReq.stream().map(String::valueOf).collect(Collectors.toList());
} catch (InterruptedException e) {
return null;
} finally {
log.info("Req for {} finished");
}
}
}
И основной:
@Slf4j
public class MainClass {
public static void main(String[] args) {
MyClient myClient = new MyClient(new MyClientSideService(new MyServerSideService()));
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int m = 0; m < 100; m++) {
int k = m;
log.info("Response is {}", myClient.requestRow(k));
}
}).start();
}
}
}
Согласно журналам это занимает приблизительно 4 мин 22 сек, но этоперебор. Я думаю, что это может быть значительно улучшено. Я хотел бы реализовать неявное пакетирование. Таким образом, MyClientSideService
должен собирать запросы, и когда он становится равным 50 (это предварительно сконфигурированный размер пакета) или истек некоторый предварительно сконфигурированный тайм-аут, то для запроса MyServerSideService
и обратного результата маршрутизации клиентам. Протокол должен быть синхронным, поэтому клиенты должны быть заблокированы до получения результата.
Я пытался написать код, используя CountDownLatch
es и CyclicBarrier
s, но мои попытки были далеки от успеха.
Как мне достичь своей цели?
PS
Если заменить requestRowBatch
тип возврата List<String>
с на Map<Integer, String>
, чтобы делегировать запрос и сопоставление ответа на серверследующие работы с ограничениями. Это работает, только если я отправляю <= 25 запросов </p>
@Slf4j
public class MyClientSideService {
private final Integer batchSize = 25;
private final Integer maxTimeoutMillis = 5000;
private final MyServerSideService myServerSideService;
private final Queue<Integer> queue = new ArrayBlockingQueue(batchSize);
private final Map<Integer, String> responseMap = new ConcurrentHashMap();
private final AtomicBoolean started = new AtomicBoolean();
private CountDownLatch startBatchRequestLatch = new CountDownLatch(batchSize);
private CountDownLatch awaitBatchResponseLatch = new CountDownLatch(1);
public MyClientSideService(MyServerSideService myServerSideService) {
this.myServerSideService = myServerSideService;
}
public String requestSingleRow(int req) {
queue.offer(req);
if (!started.compareAndExchange(false, true)) {
log.info("Start batch collecting");
startBatchCollecting();
}
startBatchRequestLatch.countDown();
try {
log.info("Awaiting batch response latch for {}...", req);
awaitBatchResponseLatch.await();
log.info("Finished awaiting batch response latch for {}...", req);
return responseMap.get(req);
} catch (InterruptedException e) {
e.printStackTrace();
return "EXCEPTION";
}
}
private void startBatchCollecting() {
new Thread(() -> {
try {
log.info("Await startBatchRequestLatch");
startBatchRequestLatch.await(maxTimeoutMillis, TimeUnit.MILLISECONDS);
log.info("await of startBatchRequestLatch finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
responseMap.putAll(requestBatch(queue));
log.info("Releasing batch response latch");
awaitBatchResponseLatch.countDown();
}).start();
}
public Map<Integer, String> requestBatch(Collection<Integer> requestList) {
return myServerSideService.requestRowBatch(requestList);
}
}
Обновление
В соответствии с ответом на солод я смог разработать следующее:
@Slf4j
public class MyClientSideServiceCompletableFuture {
private final Integer batchSize = 25;
private final Integer maxTimeoutMillis = 5000;
private final MyServerSideService myServerSideService;
private final Queue<Pair<Integer, CompletableFuture>> queue = new ArrayBlockingQueue(batchSize);
private final AtomicInteger counter = new AtomicInteger(0);
private final Lock lock = new ReentrantLock();
public MyClientSideServiceCompletableFuture(MyServerSideService myServerSideService) {
this.myServerSideService = myServerSideService;
}
public String requestSingleRow(int req) {
CompletableFuture<String> future = new CompletableFuture<>();
lock.lock();
try {
queue.offer(Pair.of(req, future));
int counter = this.counter.incrementAndGet();
if (counter != 0 && counter % batchSize == 0) {
log.info("request");
List<Integer> requests = queue.stream().map(p -> p.getKey()).collect(Collectors.toList());
Map<Integer, String> serverResponseMap = requestBatch(requests);
queue.forEach(pair -> {
String response = serverResponseMap.get(pair.getKey());
CompletableFuture<String> value = pair.getValue();
value.complete(response);
});
queue.clear();
}
} finally {
lock.unlock();
}
try {
return future.get();
} catch (Exception e) {
return "Exception";
}
}
public Map<Integer, String> requestBatch(Collection<Integer> requestList) {
return myServerSideService.requestRowBatch(requestList);
}
}
Но это не такработать, если размер не кратен размеру партии