Интересный вопрос .. Я могу ошибаться, но я не думаю, что вы можете избежать Map
из Files
где-то в конвейере.
Я думаю, что мое решение может быть дополнительно очищено , но, кажется, выполняет sh следующее:
- Устраняет необходимость в двунаправленном отображении
- Избегает необходимости вызова
Map.remove(...)
Я предлагаю вам трактовать Map
из Files
как отдельный Observable
, испускающий совершенно новый Map
с более медленным интервалом:
Observable<HashMap<Integer, File>> fileObservable = Observable.fromCallable(
() -> new HashMap<Integer, File>() )
.repeatWhen( completed -> completed.delay( LONGER, TimeUnit.SECONDS ));
Затем в вашем событии Observable
, вы можете позвонить .withLatestFrom( fileObservable, ( group, files ) -> {...} )
( примечание: этот блок еще не завершен ):
Observable.just( EVENTS )
.flatMap( num -> Observable.fromIterable(
ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
.groupBy( num -> Math.abs( num % 2 ))
.repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
.withLatestFrom( fileObservable, ( group, files ) -> {
File file = files.computeIfAbsent(
group.getKey(),
Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));
group.map( Object::toString ).toList()
.subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));
return files;
} )
Пока все хорошо, вы получаете свой последний набор Files
в комплекте наряду с вашими событиями. Затем вы должны обработать Files
. Я думаю, что вы можете сделать это, используя distinctUntilChanged()
. Он должен быть довольно эффективным, поскольку он будет вызывать HashMap.equals(...)
под прикрытием, а идентичность объекта Map
не меняется в большинстве случаев. HashMap.equals(...)
сначала проверяет идентичность.
Поскольку на данный момент вы действительно заинтересованы в обработке предыдущего набора испущенных Files
, а не текущего, вы можете использовать .scan(( prev, current ) -> {...} )
оператор. На этом завершенный блок кода сверху:
Observable.just( EVENTS )
.flatMap( num -> Observable.fromIterable(
ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
.groupBy( num -> Math.abs( num % 2 ))
.repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
.withLatestFrom( fileObservable, ( group, files ) -> {
File file = files.computeIfAbsent(
group.getKey(),
Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));
group.map( Object::toString ).toList()
.subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));
return files;
} )
.distinctUntilChanged()
.scan(( prev, current ) -> {
Observable.fromIterable( prev.entrySet() )
.map( Entry::getValue )
.subscribe( file -> System.out.println( "File - '" + file + "', Lines - " +
FileUtils.readLines( file, StandardCharsets.UTF_8 )));
return current;
} )
.subscribe();
Thread.sleep( SLEEP );
Немного длиннее, чем ваше первоначальное решение, но может решить пару проблем.