Ограничение скорости с помощью Rx Java с помощью окна () - PullRequest
0 голосов
/ 21 февраля 2020

Я пытаюсь ограничить количество добавочных обновлений Карты, но не заинтересован в отправке всей Карты - только добавочные обновления к ней, буферизированные через определенный интервал, представленный как малонаселенные Карты. Я также хотел бы немедленно выпустить первое обновление.

Я играл с операторами window () и throttleLatest ().

Это самое близкое, что я получил, так далеко. В моем случае функция накопителя больше похожа на Map :: putAll, но я использую конкатенацию строк здесь в качестве примера:

        // Emit ~10 items per second, group into 1 second windows:
        Observable<Observable<String>> windows = Observable.range( 1, 100 )
            .concatMap( i -> Observable.just( i ).delay( 100L, TimeUnit.MILLISECONDS ))
            .map( String::valueOf )
            .window( 1, TimeUnit.SECONDS );

        // Accumulator:
        BiFunction<String, String, String> accumulator = ( s1, s2 ) -> s1 + "-" + s2;

        // From first window, return first item immediately and the last item with the
        // accumulator function applied:
        Observable<String> firstWindow = windows
                .concatMap( window -> window
                        .scan( accumulator )
                        .throttleLatest( Long.MAX_VALUE, TimeUnit.SECONDS, true ))
                .take( 2 );

        // From remaining windows, return the last item with the function applied only:
        Observable<String> remainingWindows = windows
                .skip( 1 )
                .concatMap(
                        window -> window
                            .scan( accumulator )
                            .lastOrError()
                            .toObservable() );

        // Print the results:
        firstWindow.concatWith( remainingWindows )
            .blockingSubscribe( System.out::println );

Вышеприведенное дает мне почти то, что я хочу, но есть 2-секундная пауза между 2-й и 3-й эмиссией:

1
// 1 second pause
1-2-3-4-5-6-7-8-9
// 2 second pause
10-11-12-13-14-15-16-17-18-19
// 1 second pause
20-21-22-23-24-25-26-27-28-29
// 1 second pause
// . . .

Кто-нибудь может придумать лучший способ выполнить sh это?


РЕДАКТИРОВАТЬ: Хорошо, так Я, кажется, заставил его работать, добавив вызовы publi sh () и connect (), а также убрав skip (1) для оставшихся Windows:

        // Emit ~10 items per second, group into 1 second windows:
        ConnectableObservable<Observable<String>> windows = Observable.range( 1, 100 )
                .concatMap( i -> Observable.just( i ).delay( 100L, TimeUnit.MILLISECONDS ))
                .map( String::valueOf )
                .window( 1, TimeUnit.SECONDS )
                .publish();

        // Accumulator:
        BiFunction<String, String, String> accumulator = ( s1, s2 ) -> s1 + "-" + s2;

        // From first window, return first item immediately and the last item with the
        // accumulator function applied:
        Observable<String> firstWindow = windows
                .concatMap( window -> window
                        .scan( accumulator )
                        .throttleLatest( Long.MAX_VALUE, TimeUnit.SECONDS, true ))
                .take( 2 );

        // From remaining windows, return the last item with the function applied only:
        Observable<String> remainingWindows = windows
                .concatMap(
                        window -> window
                            .scan( accumulator )
                            .lastOrError()
                            .toObservable() );

        // Print the results:
        Observable<String> result = firstWindow.concatWith( remainingWindows ).share();
        result.subscribe( System.out::println );

        // Connect:
        windows.connect();
        result.blockingSubscribe();

Все равно хотелось бы услышать если кто-то имеет лучший подход или может дать хорошее представление о том, что здесь происходит. Например, почему skip (1) не требуется для оставшихся Windows?

...