У меня есть тема, которую я использую в качестве шины событий. Эта шина испускает несколько предметов общего типа:
static class EventBus {
private final Subject<Event> subject = BehaviorSubject.create();
/**
* @param event Event to publish to the subject
*/
void publish(@NonNull Event event) {
this.subject.onNext(event);
}
/**
* Listen to events of a specific class
*
* @param eventClass class of event to listen for
* @param <T> generic type
* @return Observable
*/
<T extends Event> Observable<T> listenForEvents(@NonNull Class<T> eventClass) {
return this.subject
.serialize()
.filter(eventClass::isInstance)
.cast(eventClass);
}
}
abstract static class Event {
int id;
public Event(int id) {
this.id = id;
}
}
final class EventA extends Event {
EventA(int id) {
super(id);
}
}
final class EventB extends Event {
EventB(int id) {
super(id);
}
}
Вещи, которые хотят прослушивать определенные события, могут вызывать listenForEvents
и передавать в класс, который они хотят прослушивать.
Однако я хочу поведение BehaviorSubject для каждого типа класса. Пример иллюстрирует это:
void main() {
EventBus eventBus = new EventBus();
// publish some A's
eventBus.publish(new EventA(1));
eventBus.publish(new EventA(2));
eventBus.publish(new EventA(3));
// publish a B
eventBus.publish(new EventB(4));
eventBus.listenForEvents(EventA.class)
.subscribe(eventA -> {
// 1. I want to receive EventA(3) but I won't receive
// anything because last value in the BehaviorSubject is an EventB
Log.d("TAG", String.valueOf(eventA.id));
});
eventBus.publish(new EventA(5));
// 2. It's only if I put another EventA into the subject
// after I subscribe that I'll seen anything emitted.
}
У меня есть несколько возможных решений, но ни одно из них не кажется хорошим =)
1.) Несколько объектов BehaviorSubjects, по одному для каждого типа события; они могут быть ленивыми созданные
2.) Отслеживать «последнее событие типа X» вне наблюдаемого потока
3.) Конвертируйте BehaviorSubject
с ReplaySubject
и используйте .debounce(10, TimeUnit.MILLISECONDS)
и предполагайте, что воспроизводимые элементы будут отправляться быстро, а новые элементы не будут публиковаться быстро
private final Subject<Event> subject = ReplaySubject.create();
<T extends Event> Observable<T> listenForEvents(@NonNull Class<T> eventClass) {
return this.subject
.serialize()
.filter(eventClass::isInstance)
.cast(eventClass)
.debounce(10, TimeUnit.MILLISECONDS);
}
Есть ли чистый способ сделать это с существующими операторами rx?