Я пытаюсь преобразовать следующий код RxJava 1 в RxJava2
Observable<PGNotification> observe(String channel, long pollingPeriod) {
return Observable.<PGNotification>create(subscriber -> {
try {
Connection connection = DriverManager
.getConnection("jdbc:postgresql:db");
subscriber.add(Subscriptions.create(() ->
closeQuietly(connection)));
listenOn(connection, channel);
Jdbc42Connection pgConn = (Jdbc42Connection) connection;
pollForNotifications(pollingPeriod, pgConn)
.subscribe(Subscribers.wrap(subscriber));
} catch (Exception e) {
subscriber.onError(e);
}
}).share();
}
Моя проблема в том, что мне не хватает методов subscriber.add и Subscribeers.wrap.
Чтобы устранить недостающие методы в subscriber.add, я понял, что существует некоторый конфликт имен со спецификацией реактивных потоков, и что он был переименован в ResourceSubscription, но я не смог найти пример того, как использовать его в контекстеObservable.create?