Реализация потокового API-процессора с использованием rxjava - PullRequest
0 голосов
/ 19 сентября 2019

Мне любопытно, как API потока Java связан с RxJava, и, в частности, как я могу написать простые процессоры, использующие RxJava.

Допустим, у вас есть простое преобразование, отображающее строки в их длины:

Function<String,Integer> transformation = x -> x.length();

Observable<String> in = ...;
Observable<Integer> out = in.map(transformation);

Теперь, скажем, я хочу реализовать преобразование карты с помощью Procecssor.Какой лучший способ реализовать это?Моя идея заключается в следующем - но я не уверен, что это лучшая практика или даже полностью правильная:

class TestProcessor implements Processor<String,Integer> {
    private final UnicastProcessor<Integer> processor = UnicastProcessor.create();
    private Subscription subscription;

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        processor.subscribe(subscriber);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }

    private static Integer transform(String s) {
        return s.length();
    }

    @Override
    public void onNext(String s) {
        try {
            Integer transformed = transform(s);
            processor.onNext(t);
        } catch (Exception e) {
            processor.onError(e);
        }
    }

    @Override
    public void onError(Throwable throwable) {
        processor.onError(throwable);
    }

    @Override
    public void onComplete() {
        processor.onComplete();
    }
}

Спасибо

...