Мне любопытно, как API потока Java связан с RxJava, и, в частности, как я могу написать простые процессоры, использующие RxJava.
Допустим, у вас есть простое преобразование, отображающее строки в их длины:
Function<String,Integer> transformation = x -> x.length();
Observable<String> in = ...;
Observable<Integer> out = in.map(transformation);
Теперь, скажем, я хочу реализовать преобразование карты с помощью Procecssor.Какой лучший способ реализовать это?Моя идея заключается в следующем - но я не уверен, что это лучшая практика или даже полностью правильная:
class TestProcessor implements Processor<String,Integer> {
private final UnicastProcessor<Integer> processor = UnicastProcessor.create();
private Subscription subscription;
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
processor.subscribe(subscriber);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
private static Integer transform(String s) {
return s.length();
}
@Override
public void onNext(String s) {
try {
Integer transformed = transform(s);
processor.onNext(t);
} catch (Exception e) {
processor.onError(e);
}
}
@Override
public void onError(Throwable throwable) {
processor.onError(throwable);
}
@Override
public void onComplete() {
processor.onComplete();
}
}
Спасибо