RxJava - наблюдаемые обновления кэша и выдача наибольших значений - PullRequest
0 голосов
/ 09 сентября 2018

У меня в настоящее время есть Observable<ProductIDUpdate>, испускающий объект, который представляет обновление идентификатора продукта. Обновление может быть идентификатором нового ADDITION или с истекшим сроком действия и требует DELETION.

public class ProductIDUpdate {

    enum UpdateType {
        ADDITION, DELETEION;
    }

    private int id;
    private UpdateType type;

    public ProductIDUpdate(int id) {
        this(id, UpdateType.ADDITION);
    }

    public ProductIDUpdate(int id, UpdateType type) {
        this.id = id;
        this.type = type;
    }
}

Я хочу отслеживать обновление с наибольшим значением идентификатора, поэтому я хочу изменить поток так, чтобы выводился текущий самый высокий идентификатор. Как бы я кешировал элементы обновления в потоке так, чтобы, если текущий самый высокий идентификатор был удален, был получен следующий самый высокий доступный идентификатор?

Ответы [ 2 ]

0 голосов
/ 12 сентября 2018

Может ли что-то подобное удовлетворить ваши требования?

public static void main(String[] args) {
    Observable<ProductIDUpdate> products =
            Observable.just(new ProductIDUpdate(1, ADDITION),
                            new ProductIDUpdate(4, ADDITION),
                            new ProductIDUpdate(2, ADDITION),
                            new ProductIDUpdate(5, ADDITION),
                            new ProductIDUpdate(1, DELETION),
                            new ProductIDUpdate(5, DELETION),
                            new ProductIDUpdate(3, ADDITION),
                            new ProductIDUpdate(6, ADDITION));

    products.distinctUntilChanged((prev, current) -> prev.getId() > current.getId())
            .filter(p -> p.getType().equals(ADDITION))
            .subscribe(System.out::println,
                       Throwable::printStackTrace);

    Observable.timer(1, MINUTES) // just for blocking the main thread
              .toBlocking()
              .subscribe();
}

Это печатает:

ProductIDUpdate{id=1, type=ADDITION}
ProductIDUpdate{id=4, type=ADDITION}
ProductIDUpdate{id=5, type=ADDITION}
ProductIDUpdate{id=6, type=ADDITION}

Если вы удалите filter(), это напечатает:

ProductIDUpdate{id=1, type=ADDITION}
ProductIDUpdate{id=4, type=ADDITION}
ProductIDUpdate{id=5, type=ADDITION}
ProductIDUpdate{id=5, type=DELETION}
ProductIDUpdate{id=6, type=ADDITION}
0 голосов
/ 09 сентября 2018

Я ничего не знаю о Rx, но вот мое понимание:

  • у вас есть несколько идентификаторов продуктов.Мне не ясно, получаете ли вы их со временем как часть некоторых сообщений, отправляемых вашему классу, или если вы знаете все идентификаторы с начала
  • , вы хотите создать поток поверх вашего источника продуктаидентификаторы, которые генерируют наибольший доступный идентификатор в любой момент времени

Если мое понимание верно, как насчет использования PriorityQueue ?Вы кэшируете идентификаторы в очереди с помощью обратного компаратора (по умолчанию он сохраняет наименьший элемент в верхней части кучи), а когда вы хотите выдать новое значение, вы просто выталкиваете верхнее значение.

...