Я пытаюсь ограничить количество добавочных обновлений Карты, но не заинтересован в отправке всей Карты - только добавочные обновления к ней, буферизированные через определенный интервал, представленный как малонаселенные Карты. Я также хотел бы немедленно выпустить первое обновление.
Я играл с операторами window () и throttleLatest ().
Это самое близкое, что я получил, так далеко. В моем случае функция накопителя больше похожа на Map :: putAll, но я использую конкатенацию строк здесь в качестве примера:
// Emit ~10 items per second, group into 1 second windows:
Observable<Observable<String>> windows = Observable.range( 1, 100 )
.concatMap( i -> Observable.just( i ).delay( 100L, TimeUnit.MILLISECONDS ))
.map( String::valueOf )
.window( 1, TimeUnit.SECONDS );
// Accumulator:
BiFunction<String, String, String> accumulator = ( s1, s2 ) -> s1 + "-" + s2;
// From first window, return first item immediately and the last item with the
// accumulator function applied:
Observable<String> firstWindow = windows
.concatMap( window -> window
.scan( accumulator )
.throttleLatest( Long.MAX_VALUE, TimeUnit.SECONDS, true ))
.take( 2 );
// From remaining windows, return the last item with the function applied only:
Observable<String> remainingWindows = windows
.skip( 1 )
.concatMap(
window -> window
.scan( accumulator )
.lastOrError()
.toObservable() );
// Print the results:
firstWindow.concatWith( remainingWindows )
.blockingSubscribe( System.out::println );
Вышеприведенное дает мне почти то, что я хочу, но есть 2-секундная пауза между 2-й и 3-й эмиссией:
1
// 1 second pause
1-2-3-4-5-6-7-8-9
// 2 second pause
10-11-12-13-14-15-16-17-18-19
// 1 second pause
20-21-22-23-24-25-26-27-28-29
// 1 second pause
// . . .
Кто-нибудь может придумать лучший способ выполнить sh это?
РЕДАКТИРОВАТЬ: Хорошо, так Я, кажется, заставил его работать, добавив вызовы publi sh () и connect (), а также убрав skip (1) для оставшихся Windows:
// Emit ~10 items per second, group into 1 second windows:
ConnectableObservable<Observable<String>> windows = Observable.range( 1, 100 )
.concatMap( i -> Observable.just( i ).delay( 100L, TimeUnit.MILLISECONDS ))
.map( String::valueOf )
.window( 1, TimeUnit.SECONDS )
.publish();
// Accumulator:
BiFunction<String, String, String> accumulator = ( s1, s2 ) -> s1 + "-" + s2;
// From first window, return first item immediately and the last item with the
// accumulator function applied:
Observable<String> firstWindow = windows
.concatMap( window -> window
.scan( accumulator )
.throttleLatest( Long.MAX_VALUE, TimeUnit.SECONDS, true ))
.take( 2 );
// From remaining windows, return the last item with the function applied only:
Observable<String> remainingWindows = windows
.concatMap(
window -> window
.scan( accumulator )
.lastOrError()
.toObservable() );
// Print the results:
Observable<String> result = firstWindow.concatWith( remainingWindows ).share();
result.subscribe( System.out::println );
// Connect:
windows.connect();
result.blockingSubscribe();
Все равно хотелось бы услышать если кто-то имеет лучший подход или может дать хорошее представление о том, что здесь происходит. Например, почему skip (1) не требуется для оставшихся Windows?