Пакетные запросы в Groovy? - PullRequest
       8

Пакетные запросы в Groovy?

0 голосов
/ 16 апреля 2019

Я новичок в Groovy и немного растерялся в том, как группировать запросы, чтобы они могли быть отправлены на сервер в виде пакета, а не по отдельности, как у меня сейчас:

class Handler {
    private String jobId
    // [...]
    void submit() {
        // [...]
        // client is a single instance of Client used by all Handlers
        jobId = client.add(args)
    }
}

class Client {
    //...
    String add(String args) {
        response = postJson(args)
        return parseIdFromJson(response)
    }
}

Как и сейчас, что-то вызывает Client.add(), который отправляет REST API и возвращает проанализированный результат.

Проблема, с которой я столкнулся, заключается в том, что метод add() вызывается, может быть, тысячи раз подряд, и было бы намного эффективнее собрать все args, переданные в add(), подождать, пока наступит момент когда перестают поступать вызовы add(), а затем POST API REST один раз для этого пакета, посылая все аргументы за один раз.

Возможно ли это? Потенциально, add() может немедленно вернуть поддельный идентификатор, если пакетирование происходит, происходит отправка, и клиент может позже узнать поиск между поддельным идентификатором и идентификатором, полученным из REST API (который будет возвращать идентификаторы в порядке, соответствующем на отправленные ему аргументы).

1 Ответ

1 голос
/ 18 апреля 2019

Как уже упоминалось в комментариях, это может быть хорошим случаем для gpars, который отлично подходит для подобных сценариев.

Это на самом деле не столько к Groovy, сколько к асинхронному программированию на Java и на JVM в целом.

Если вы хотите придерживаться параллельных идиом Java, я собрал фрагмент кода, который вы могли бы использовать в качестве потенциальной отправной точки. Это не было проверено и крайние случаи не были рассмотрены. Я написал это для удовольствия, и поскольку это асинхронное программирование, и я не потратил достаточно времени на размышления об этом, я подозреваю, что там есть отверстия, достаточно большие, чтобы пропустить танк.

Как говорится, здесь приведен код, который делает попытку пакетирования запросов:

import java.util.concurrent.* 
import java.util.concurrent.locks.* 

// test code 
def client = new Client()

client.start()
def futureResponses = []
1000.times { 
  futureResponses << client.add(it as String)
}
client.stop()

futureResponses.each { futureResponse ->
  // resolve future...will wait if the batch has not completed yet
  def response = futureResponse.get()
  println "received response with index ${response.responseIndex}"
}
// end of test code 

class FutureResponse extends CompletableFuture<String> {
  String args
}

class Client {
  int minMillisLullToSubmitBatch = 100
  int maxBatchSizeBeforeSubmit   = 100
  int millisBetweenChecks        = 10
  long lastAddTime               = Long.MAX_VALUE

  def batch = []
  def lock = new ReentrantLock()
  boolean running = true

  def start() {
    running = true
    Thread.start { 
      while (running) {
        checkForSubmission()
        sleep millisBetweenChecks
      }
    }
  }

  def stop() {
    running = false
    checkForSubmission()
  }

  def withLock(Closure c) {
    try { 
      lock.lock()
      c.call()
    } finally { 
      lock.unlock()
    }    
  }

  FutureResponse add(String args) {
    def future = new FutureResponse(args: args)

    withLock { 
      batch << future
      lastAddTime = System.currentTimeMillis()
    }

    future
  }

  def checkForSubmission() {
    withLock { 
      if (System.currentTimeMillis() - lastAddTime > minMillisLullToSubmitBatch ||
          batch.size() > maxBatchSizeBeforeSubmit) { 
        submitBatch()
      }
    }
  }

  def submitBatch() {
    // here you would need to put the combined args on a format 
    // suitable for the endpoint you are calling. In this 
    // example we are just creating a list containing the args
    def combinedArgs = batch.collect { it.args }

    // further there needs to be a way to map one specific set of 
    // args in the combined args to a specific response. If the 
    // endpoint responds with the same order as the args we submitted
    // were in, then that can be used otherwise something else like 
    // an id in the response etc would need to be figured out. Here 
    // we just assume responses are returned in the order args were submitted
    List<String> combinedResponses = postJson(combinedArgs)
    combinedResponses.indexed().each { index, response -> 
      // here the FutureResponse gets a value, can be retrieved with 
      // futureResponse.get()
      batch[index].complete(response)
    }

    // clear the batch
    batch = []
  }

  // bogus method to fake post
  def postJson(combinedArgs) {
    println "posting json with batch size: ${combinedArgs.size()}"
    combinedArgs.collect { [responseIndex: it] }
  }
}

Несколько заметок:

  • что-то должно быть в состоянии отреагировать на тот факт, что в течение некоторого времени не было вызовов для добавления. Это подразумевает отдельный поток мониторинга и управляет методами start и stop.
  • если у нас есть бесконечная последовательность добавления без пауз, у вас могут закончиться ресурсы. Поэтому код имеет максимальный размер пакета, в котором он будет отправлять пакет, даже если в вызовах для добавления нет затишья.
  • код использует блокировку, чтобы удостовериться (или попытаться, как упоминалось выше, я не учел все возможные проблемы здесь), мы остаемся потокобезопасными во время пакетной отправки и т. Д.
  • предполагая, что общая идея здесь звучит правильно, вам остается реализовать логику в submitBatch, где основная проблема связана с отображением определенных аргументов в конкретные ответы
  • CompletableFuture - это класс Java 8. Это можно решить с помощью других конструкций в более ранних выпусках, но я оказался на java 8.
  • Я более или менее написал это без выполнения или тестирования, я уверен, что там есть некоторые ошибки.
  • Как видно из распечатки ниже, настройка «maxBatchSizeBeforeSubmit» является скорее рекомендацией, чем фактическим макс. Поскольку поток мониторинга некоторое время спит, а затем просыпается, чтобы проверить, как у нас идут дела, потоки, вызывающие метод add, могли накопить любое количество запросов в пакете. Все, что нам гарантировано, - это то, что каждые millisBetweenChecks мы просыпаемся и проверяем, как у нас идут дела, и если критерии для отправки партии были достигнуты, то партия будет отправлена.

Если вы не знакомы с Java-фьючерсами и замками, я бы порекомендовал вам прочитать их.

Если вы сохраните приведенный выше код в Groovy-скрипте code.groovy и запустите его:

~> groovy code.groovy
posting json with batch size: 153
posting json with batch size: 234
posting json with batch size: 243
posting json with batch size: 370
received response with index 0
received response with index 1
received response with index 2
...
received response with index 998
received response with index 999

~> 

это должно сработать и распечатать "ответы", полученные из наших поддельных представлений json.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...