Оптимальная оркестровка списка сетевых вызовов и задач обработки в Java - PullRequest
0 голосов
/ 23 ноября 2018

У меня есть следующий рабочий процесс: есть n записей, которые необходимо извлечь по сети, и впоследствии n дорогостоящих вычислений, которые необходимо выполнить для каждой.Вставьте код, это будет выглядеть следующим образом:

List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
    Record r = RetrieveRecord(id); // Blocking IO
    ProcessRecord(r); // CPU Intensive
})

Я хотел бы преобразовать блокирующую часть в асинхронный, чтобы минимизировать время с помощью одного потока - по существу, обеспечивая запись i+1извлекается при обработке записи i.Чтобы выполнение выглядело следующим образом:

Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....

Теперь я могу придумать наивный способ реализовать это с List<> и CompletableFuture, но для этого потребуется обработать первую запись по-другому.

Есть ли более элегантный способ решения этой проблемы с помощью реактивных потоков?Решение, которое, возможно, позволило бы мне легко настроить, сколько записей Process() может отставать от Retreive()?

Ответы [ 3 ]

0 голосов
/ 24 ноября 2018

Вот то, что я наконец-то придумал, похоже, для выполнения работы:

Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
    .concatMapEager(v->
            Flowable.just(v)
            .subscribeOn(Schedulers.io())
            .map(  e->{
                System.out.println(getElapsed("Req " + e + " started");
                Thread.sleep(1000); // Network: 1 sec
                System.out.println(getElapsed("Req " + e + " done");
                return e;
            }, requestsOnWire, 1) // requestsOnWire = K = 2
           .blockingSubscribe(new DisposableSubscriber<Integer>() {
        @Override
        protected void onStart() {
            request(1);
        }
        @Override
        public void onNext(Integer s) {
            request(1);
            System.out.println("Proc " + s + " started");
            try {
                Thread.sleep(3000); // Compute: 3 secs
                System.out.println("Proc " + s + " done");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void onError(Throwable t) {

        }
        @Override
        public void onComplete() {

        }
    });

Ниже приведен порядок выполнения.Обратите внимание, что в любой данный момент времени обрабатывается 1 запись, не более 2 запросов по проводам и не более 2 необработанных записей в памяти (процесс отстает от K = 2) записей:

 0 secs: Req 1 started
       : Req 2 started
 1 secs: Req 2 done
       : Req 1 done
       : Proc 1 started
       : Req 3 started
       : Req 4 started
 2 secs: Req 3 done
       : Req 4 done
 4 secs: Proc 1 done
       : Proc 2 started
       : Req 5 started
 5 secs: Req 5 done
 7 secs: Proc 2 done
       : Proc 3 started
       : Req 6 started
 8 secs: Req 6 done
10 secs: Proc 3 done
       : Proc 4 started
13 secs: Proc 4 done
       : Proc 5 started
16 secs: Proc 5 done
       : Proc 6 started
19 secs: Proc 6 done

Надеюсь, что здесь нет анти-паттернов / ловушек.

0 голосов
/ 24 ноября 2018

Решение с использованием df4j с явным асинхронным семафором:

import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;

public class AsyncSemaDemo extends Actor {
    List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
    Semafor sema = new Semafor(this, 2);
    Iterator<Integer> iter = ids.iterator();
    int tick = 100; // millis
    CountDownLatch done = new CountDownLatch(ids.size());
    long start = System.currentTimeMillis();

    private void printClock(String s) {
        long ticks = (System.currentTimeMillis() - start)/tick;
        System.out.println(Long.toString(ticks) + " " + s);
    }

    CompletableFuture<Integer> Retrieve(Integer e) {
        return CompletableFuture.supplyAsync(() -> {
            printClock("Req " + e + " started");
            try {
                Thread.sleep(tick); // Network
            } catch (InterruptedException ex) {
            }
            printClock(" Req " + e + " done");
            return e;
        }, executor);
    }

    void ProcessRecord(Integer s) {
        printClock(" Proc " + s + " started");
        try {
            Thread.sleep(tick*2); // Compute
        } catch (InterruptedException ex) {
        }
        printClock("  Proc " + s + " done");
    }

    @Action
    public void act() {
        if (iter.hasNext()) {
            CompletableFuture<Integer> fut = Retrieve(iter.next());
            fut.thenRun(sema::release);
            fut.thenAcceptAsync(this::ProcessRecord,  executor)
            .thenRun(done::countDown);
        } else {
            super.stop();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
        asyncSemaDemo.start(ForkJoinPool.commonPool());
        asyncSemaDemo.done.await();
    }
}

его журнал должен быть:

0 Req 1 started
0 Req 2 started
1  Req 1 done
1  Proc 1 started
1 Req 3 started
1  Req 2 done
1  Proc 2 started
1 Req 4 started
2  Req 3 done
2  Proc 3 started
2 Req 5 started
2  Req 4 done
2  Proc 4 started
3   Proc 1 done
3  Req 5 done
3  Proc 5 started
3   Proc 2 done
4   Proc 3 done
4   Proc 4 done
5   Proc 5 done

Обратите внимание, как это решение близко к моему предыдущему ответу со стандартом java.util.concurrent.Semaphore.

0 голосов
/ 23 ноября 2018

Итак, у вас есть N задач, и вы хотите запускать их параллельно, но не более K задач одновременно.Самый естественный способ - изначально иметь генератор задач и счетчик разрешений с K-разрешениями.Генератор задач создает K задач и ожидает дополнительных разрешений.Каждое разрешение принадлежит некоторой задаче и возвращается после ее завершения.Стандартным счетчиком разрешений в Java является класс java.util.concurrent.Semaphore:

List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
    sem.aquire();
    CompletableFuture<Data> fut = Retrieve(id);
    fut.thenRun(sem::release);
    fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})

Поскольку генератор задач занимает только один поток, нет смысла делать его асинхронным.Если, однако, вы не хотите использовать выделенный поток для генератора задач и хотите реализовать асинхронное решение, то главный вопрос заключается в том, какой класс может играть роль счетчика асинхронных разрешений.У вас есть 3 варианта:

  • использовать неявный асинхронный счетчик разрешений, который является частью реактивных потоков, найденных в RxJava, проекте Reactor и т. Д.
  • использовать явный асинхронный семафор org.df4j.core.boundconnector.permitstream.Semafor, включенный вмоя асинхронная библиотека df4j
  • сделай сам
...