RxJava эквивалент простого примера ThreadPoolExecutor - PullRequest
1 голос
/ 06 октября 2019

Я не играл в Java около 8 лет, и с тех пор многое изменилось. Самой большой проблемой для меня был RxJava / реактивный. Я ищу грубое руководство о том, как сделать приведенный ниже эквивалент полностью реактивным способом.

Основное требование, реализованное ниже с помощью ThreadPoolExecutor, заключается в обработке большого количества Stuff путем вызова удаленной сети. услуга, которая имеет задокументированный лимит скорости 100 запросов / минуту. Моя цель состоит в том, чтобы обработать как можно больше как можно быстрее, не сбрасывая при этом Stuff, но при этом соблюдая ограничение скорости нисходящего потока. Этот код был упрощен, чтобы избежать ошибок, переборок, автоматических выключателей, логики повторных попыток и т. Д.

Этот код в настоящее время работает нормально, но в результате возникает ощущение, что много потраченных впустую потоков, учитывая все неблокирующие реактивные параметры,Даже HTTP-клиент, который я использую для вызова своей службы, предлагает обратно Flowable, который я просто блокирую в каждом из 20 потоков исполнителя.

Я бы хотел понять, что такое реактивный эквивалентдолжно быть. Где я боролся, так это почти все документы, которые я нашел для демонстрации, используя статические источники для Observable (например: Observable.fromArray(1,2,3,4,5)). Я знаю, что решение, вероятно, включает IoScheduler и, возможно, groupBy, но мне еще предстоит выяснить, как объединить Flowable, поступающие от моего HTTP-клиента, в какую-то полную цепочку, выполняющую распараллеливание (до предела, такого как20) и ограничение скорости.

public class Example {
    private static final int THREADS = 20;

    // using https://docs.micronaut.io/latest/guide/index.html#httpClient
    @Client("http://stuff-processor.internal:8080")
    @Inject
    RxHttpClient httpClient;

    private ThreadPoolExecutor executor;
    private final RateLimiter rateLimiter;

    public Example() {
        // up to 20 threads to process the unbounded queue
        // incoming Stuff is very bursty...
        // ...we could go hours without anything and then hundreds could come in
        this.executor = new ThreadPoolExecutor(THREADS, THREADS,
                30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        this.executor.allowCoreThreadTimeOut(true);

        // using https://resilience4j.readme.io/docs/ratelimiter
        RateLimiterConfig config = RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofSeconds(60))
                .limitForPeriod(100)
                .timeoutDuration(Duration.ofSeconds(90))
                .build();
        RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
        rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);
    }

    /**
     * Called when the user takes an action that can cause 1 or 1000s of new
     * Stuff to be entered into the system. Each instance of Stuff results in
     * a separate call to this method. Ex: 100 Stuffs = 100 calls.
     */
    void onNewStuff(Stuff stuff) {
        final Runnable task = () -> {
            final Flowable<HttpResponse<Boolean>> flowable = httpClient.exchange(
                    HttpRequest.POST("/process", stuff),
                    Boolean.class);

            final HttpResponse<Boolean> response = flowable.blockingFirst();
            if (response.body()) {
                System.out.println("Success!");
            } else {
                System.out.println("Fail :(");
            }
        };

        final Runnable rateLimitedTask = 
                RateLimiter.decorateRunnable(rateLimiter, task);
        executor.submit(rateLimitedTask);
    }
}

Спасибо!

1 Ответ

1 голос
/ 07 октября 2019

Во-первых, чтобы построить это полностью неблокирующим образом, вам нужно использовать неблокирующую асинхронную клиентскую библиотеку HTTP, такую ​​как Netty. Я не уверен, как работает RxHttpClient.

Скажем, у вас есть список stuff с. Вот как я бы это сделал:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();

flatMap объединяет ответы по мере их поступления.

Чтобы ограничить скорость, у вас flatMap есть второй параметр, который ограничиваетколичество внутренних потоков, на которые он подписывается параллельно. Скажем, вы хотите сделать не более 10 звонков одновременно. Сделайте это:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();
...