Использование Rx Java для записи бесконечного потока сгруппированных событий во вращающиеся файлы - PullRequest
1 голос
/ 17 марта 2020

Я пытаюсь добиться следующего поведения:

  • Периодически опрашивать / генерировать поток событий (короткая продолжительность, скажем, 1 секунда)
  • Затем события группируются по к некоторому внутреннему признаку.
  • Каждая группа событий записывается в соответствующий файл немедленно (это критическое значение для поведения, которое я хочу поддерживать)
  • Ожидаются файлы для повторного использования для сопоставления групп (имеющих одинаковый ключ) в последующих событиях, пока они не будут запечатаны / повернуты
  • При более длительной (скажем, 5 секундах) файл запечатываются / вращаются и действуют при использовании дополнительных подписчиков

Я написал следующий пример кода для достижения вышеуказанного поведения:


    private static final Integer EVENTS = 3;
    private static final Long SHORTER = 1L;
    private static final Long LONGER = 5L;
    private static final Long SLEEP = 100000L;

    public static void main(final String[] args) throws Exception {

        val files = new DualHashBidiMap<Integer, File>();

        Observable.just(EVENTS)
                .flatMap(num -> Observable.fromIterable(ThreadLocalRandom.current().ints(num).boxed().collect(Collectors.toList())))
                .groupBy(num -> Math.abs(num % 2))
                .repeatWhen(completed -> completed.delay(SHORTER, TimeUnit.SECONDS))
                .map(group -> {
                    val file = files.computeIfAbsent(group.getKey(), Unchecked.function(key -> File.createTempFile(String.format("%03d-", key), ".txt")));
                    group.map(Object::toString).toList().subscribe(lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true));
                    return file;
                })
                .buffer(LONGER, TimeUnit.SECONDS)
                .flatMap(Observable::fromIterable)
                .distinct(File::getName)
                .doOnNext(files::removeValue)
                .doOnNext(file -> System.out.println("File - '" + file + "', Lines - " + FileUtils.readLines(file, StandardCharsets.UTF_8)))
                .subscribe();
        Thread.sleep(SLEEP);
    }

Пока он работает как положено (пока отложите вопрос безопасности потоков для доступа к карте, я использую Биди-карта из commons-collections4 только для демонстрации), мне было интересно, есть ли способ реализовать вышеуказанную функциональность в форме pure RX, не полагаясь на доступ к внешней карте?

Обратите внимание, что крайне важно для файлов, которые должны быть записаны сразу при создании группы, то есть мы должны сделать файл действительным за пределами сгенерированных групп событий

Заранее спасибо.

1 Ответ

1 голос
/ 18 марта 2020

Интересный вопрос .. Я могу ошибаться, но я не думаю, что вы можете избежать Map из Files где-то в конвейере.

Я думаю, что мое решение может быть дополнительно очищено , но, кажется, выполняет sh следующее:

  • Устраняет необходимость в двунаправленном отображении
  • Избегает необходимости вызова Map.remove(...)

Я предлагаю вам трактовать Map из Files как отдельный Observable, испускающий совершенно новый Map с более медленным интервалом:

    Observable<HashMap<Integer, File>> fileObservable = Observable.fromCallable(
                () -> new HashMap<Integer, File>() )
            .repeatWhen( completed -> completed.delay( LONGER, TimeUnit.SECONDS ));

Затем в вашем событии Observable, вы можете позвонить .withLatestFrom( fileObservable, ( group, files ) -> {...} ) ( примечание: этот блок еще не завершен ):

    Observable.just( EVENTS )
        .flatMap( num -> Observable.fromIterable(
                ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
        .groupBy( num -> Math.abs( num % 2 ))
        .repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
        .withLatestFrom( fileObservable, ( group, files ) -> {

            File file = files.computeIfAbsent(
                    group.getKey(),
                    Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));

            group.map( Object::toString ).toList()
                .subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));

            return files;
        } )

Пока все хорошо, вы получаете свой последний набор Files в комплекте наряду с вашими событиями. Затем вы должны обработать Files. Я думаю, что вы можете сделать это, используя distinctUntilChanged(). Он должен быть довольно эффективным, поскольку он будет вызывать HashMap.equals(...) под прикрытием, а идентичность объекта Map не меняется в большинстве случаев. HashMap.equals(...) сначала проверяет идентичность.

Поскольку на данный момент вы действительно заинтересованы в обработке предыдущего набора испущенных Files, а не текущего, вы можете использовать .scan(( prev, current ) -> {...} ) оператор. На этом завершенный блок кода сверху:

    Observable.just( EVENTS )
        .flatMap( num -> Observable.fromIterable(
                ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
        .groupBy( num -> Math.abs( num % 2 ))
        .repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
        .withLatestFrom( fileObservable, ( group, files ) -> {

            File file = files.computeIfAbsent(
                    group.getKey(),
                    Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));

            group.map( Object::toString ).toList()
                .subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));

            return files;
        } )
        .distinctUntilChanged()
        .scan(( prev, current ) -> {

            Observable.fromIterable( prev.entrySet() )
                .map( Entry::getValue )
                .subscribe( file -> System.out.println( "File - '" + file + "', Lines - " +
                                FileUtils.readLines( file, StandardCharsets.UTF_8 )));

            return current;
        } )
        .subscribe();

    Thread.sleep( SLEEP );

Немного длиннее, чем ваше первоначальное решение, но может решить пару проблем.

...