GParsExecutorsPool.withPool не устанавливает асинхронные соединения - PullRequest
0 голосов
/ 26 мая 2019

Я использую groovy и Gpars для асинхронного соединения. Я получаю огромный JSON как запрос к моему API, я разделяю JSON, используя JSON Path. $. $ {jsonobjstpath} .. [i] [j], где i и j являются значениями в диапазоне 0:20 и циклически повторяются. Я могу получить правильный расщепленный JSON. и эти пакеты JSON я отправляю в мой API с помощью GParsExecutorsPool.withPool. Но gpar ждет ответа. Допустим, если API обработки для одного запроса занимает 10 секунд, gpar ждет 10 секунд, чтобы отправить контроллер в цикл. Мой код ниже.

    import groovyx.gpars.GParsExecutorsPool;
    import groovyx.gpars.GParsPool;
    import jsr166y.ForkJoinPool;
    import jsr166y.RecursiveTask;

    def  invoke(mega) {   
        def size=mega.get("size"); //get size of actual JSON objects
        def body=mega.get("content.body"); // Load JSON body
        int counter=Math.ceil(size/20); //Get Number of loops to run
        def Path="/AsyncInmp"; // Path to call function 
        def Name="SplitJsonObject"; //Name of Function
        int i=0;
        int j=19;
        while(j<=size) {
            msg.put("i",i); //Send i value to function
            msg.put("j",j); // Send j value to function
            callPolicy(Path,Name,body); //Call function json path to get split json, receiving JSON with i and j values
            def split_body=resp.body;//response from split json
            def Path2="/AsyncInmp"; //path to connection function
            def Name2="connect"; //name of connection function
            GParsExecutorsPool.withPool {
              (0..<1).eachParallel { k -> 
                callPolicy(Path2, Name2,split_body) //Call function to connect using gpars, This is not working
              }
            }
            j=j+20;
            i=i+20;
        }
        return true;
    }
  1. Так как я могу сделать асинхронный вызов с использованием gpar, как только мой запрос на разделение json будет готов
  2. как я могу получить ответ от всех асинхронных вызовов

1 Ответ

2 голосов
/ 26 мая 2019

Вы вызываете withPool внутри вашего while цикла, а также используете диапазон размера 1 в вашем eachParallel, я думаю, что эти вещи в совокупности заставляют ваш код вести себя однопоточным образом.

Изменив это на что-то вроде этого:

import java.util.concurrent.CopyOnWriteArrayList

def futures = [] as CopyOnWriteArrayList
GParsExecutorsPool.withPool {
  while(...) {
    ...
    futures << {
      callPolicy(Path2, Name2,split_body)
    }.async().call()
  }
}

// wait for all requests to complete
def results = futures*.get() // or futures.collect { it.get() } if this breaks

// results is now a list of return values from callPolicy

Я не тестировал и не запускал этот код, но он должен дать вам представление о том, как вы можете двигаться вперед.

<- редактировать после комментариев ->

Рабочий пример:

@Grab('org.codehaus.gpars:gpars:1.0.0')

import groovyx.gpars.*
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import static groovyx.gpars.dataflow.Dataflow.task

random    = new Random()
sequence  = new AtomicInteger(-1)

def promises = [] as CopyOnWriteArrayList

GParsPool.withPool(25) { pool -> 
  10.times { index ->
    promises << task { 
      callPolicy(index)
    }
  }
}

def results = promises*.get()

results.each { map -> 
  println map
}

def callPolicy(index) {
  Thread.sleep(random.nextInt(100) % 100)
  [index: index, sequence: sequence.incrementAndGet(), time: System.currentTimeMillis()]
}

, который производит следующий тип вывода:

~> groovy solution.groovy
[index:0, sequence:9, time:1558893973348]
[index:1, sequence:1, time:1558893973305]
[index:2, sequence:8, time:1558893973337]
[index:3, sequence:5, time:1558893973322]
[index:4, sequence:7, time:1558893973337]
[index:5, sequence:4, time:1558893973320]
[index:6, sequence:3, time:1558893973308]
[index:7, sequence:6, time:1558893973332]
[index:8, sequence:0, time:1558893973282]
[index:9, sequence:2, time:1558893973308]

~>

где мы видим, что результаты возвращаются, а также что вызовы выполняются многопоточным образом, поскольку значения sequence и index не являются последовательными.

...