Как уже упоминалось в комментариях, это может быть хорошим случаем для gpars, который отлично подходит для подобных сценариев.
Это на самом деле не столько к Groovy, сколько к асинхронному программированию на Java и на JVM в целом.
Если вы хотите придерживаться параллельных идиом Java, я собрал фрагмент кода, который вы могли бы использовать в качестве потенциальной отправной точки. Это не было проверено и крайние случаи не были рассмотрены. Я написал это для удовольствия, и поскольку это асинхронное программирование, и я не потратил достаточно времени на размышления об этом, я подозреваю, что там есть отверстия, достаточно большие, чтобы пропустить танк.
Как говорится, здесь приведен код, который делает попытку пакетирования запросов:
import java.util.concurrent.*
import java.util.concurrent.locks.*
// test code
def client = new Client()
client.start()
def futureResponses = []
1000.times {
futureResponses << client.add(it as String)
}
client.stop()
futureResponses.each { futureResponse ->
// resolve future...will wait if the batch has not completed yet
def response = futureResponse.get()
println "received response with index ${response.responseIndex}"
}
// end of test code
class FutureResponse extends CompletableFuture<String> {
String args
}
class Client {
int minMillisLullToSubmitBatch = 100
int maxBatchSizeBeforeSubmit = 100
int millisBetweenChecks = 10
long lastAddTime = Long.MAX_VALUE
def batch = []
def lock = new ReentrantLock()
boolean running = true
def start() {
running = true
Thread.start {
while (running) {
checkForSubmission()
sleep millisBetweenChecks
}
}
}
def stop() {
running = false
checkForSubmission()
}
def withLock(Closure c) {
try {
lock.lock()
c.call()
} finally {
lock.unlock()
}
}
FutureResponse add(String args) {
def future = new FutureResponse(args: args)
withLock {
batch << future
lastAddTime = System.currentTimeMillis()
}
future
}
def checkForSubmission() {
withLock {
if (System.currentTimeMillis() - lastAddTime > minMillisLullToSubmitBatch ||
batch.size() > maxBatchSizeBeforeSubmit) {
submitBatch()
}
}
}
def submitBatch() {
// here you would need to put the combined args on a format
// suitable for the endpoint you are calling. In this
// example we are just creating a list containing the args
def combinedArgs = batch.collect { it.args }
// further there needs to be a way to map one specific set of
// args in the combined args to a specific response. If the
// endpoint responds with the same order as the args we submitted
// were in, then that can be used otherwise something else like
// an id in the response etc would need to be figured out. Here
// we just assume responses are returned in the order args were submitted
List<String> combinedResponses = postJson(combinedArgs)
combinedResponses.indexed().each { index, response ->
// here the FutureResponse gets a value, can be retrieved with
// futureResponse.get()
batch[index].complete(response)
}
// clear the batch
batch = []
}
// bogus method to fake post
def postJson(combinedArgs) {
println "posting json with batch size: ${combinedArgs.size()}"
combinedArgs.collect { [responseIndex: it] }
}
}
Несколько заметок:
- что-то должно быть в состоянии отреагировать на тот факт, что в течение некоторого времени не было вызовов для добавления. Это подразумевает отдельный поток мониторинга и управляет методами start и stop.
- если у нас есть бесконечная последовательность добавления без пауз, у вас могут закончиться ресурсы. Поэтому код имеет максимальный размер пакета, в котором он будет отправлять пакет, даже если в вызовах для добавления нет затишья.
- код использует блокировку, чтобы удостовериться (или попытаться, как упоминалось выше, я не учел все возможные проблемы здесь), мы остаемся потокобезопасными во время пакетной отправки и т. Д.
- предполагая, что общая идея здесь звучит правильно, вам остается реализовать логику в
submitBatch
, где основная проблема связана с отображением определенных аргументов в конкретные ответы
CompletableFuture
- это класс Java 8. Это можно решить с помощью других конструкций в более ранних выпусках, но я оказался на java 8.
- Я более или менее написал это без выполнения или тестирования, я уверен, что там есть некоторые ошибки.
- Как видно из распечатки ниже, настройка «maxBatchSizeBeforeSubmit» является скорее рекомендацией, чем фактическим макс. Поскольку поток мониторинга некоторое время спит, а затем просыпается, чтобы проверить, как у нас идут дела, потоки, вызывающие метод add, могли накопить любое количество запросов в пакете. Все, что нам гарантировано, - это то, что каждые
millisBetweenChecks
мы просыпаемся и проверяем, как у нас идут дела, и если критерии для отправки партии были достигнуты, то партия будет отправлена.
Если вы не знакомы с Java-фьючерсами и замками, я бы порекомендовал вам прочитать их.
Если вы сохраните приведенный выше код в Groovy-скрипте code.groovy
и запустите его:
~> groovy code.groovy
posting json with batch size: 153
posting json with batch size: 234
posting json with batch size: 243
posting json with batch size: 370
received response with index 0
received response with index 1
received response with index 2
...
received response with index 998
received response with index 999
~>
это должно сработать и распечатать "ответы", полученные из наших поддельных представлений json.