Как конвертировать Flow в Flowable? - PullRequest
1 голос
/ 03 февраля 2020

Я только что добавил

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.3"

в проект. И у меня есть suspend fun foo(): Flow<Bar> в классе A (из внешнего пакета).

Мне нужно получить Flowable<Bar> для использования в java. Я хотел бы использовать расширение fun A.fooRx(): Flowable<Bar>, если это возможно.

1 Ответ

1 голос
/ 04 февраля 2020

Вы должны вытащить возвращенные Foo<Bar> из сопрограммы в Kotlin:

// SomeSuspendAPI.kt
// -----------------

// the method to convert
suspend fun <T> Flow<T>.foo() : Flow<Int> {
    return flow { emit(0) }
}

@ExperimentalCoroutinesApi
fun <T> Flow<T>.fooRx() : CompletableFuture<Flowable<Int>> {
    val self = this
    val future = CompletableFuture<Flowable<Int>>()
    GlobalScope.launch {
        try {
            future.complete(self.foo().asFlowable())
        } catch (ex: Throwable) {
            future.completeExceptionally(ex);
        }
    }
    return future
}

// Demo purposes
fun <T> just(v: T) = flow { emit(v) }

Затем вы можете использовать это в Java:

public class UseFoo {
    public static void main(String[] args) throws Exception {
        SomeSuspendAPIKt.fooRx(
                SomeSuspendAPIKt.just(1)
        )
        .thenAccept(flowable -> flowable.subscribe(System.out::println))
        .join();
    }
}

Редактировать 1:

Конечно, вы можете переместить некоторый код обратно на сторону kotlin:

fun <T> Flow<T>.fooRx2() : Flowable<Int> {
    val self = this
    val subject = SingleSubject.create<Flowable<Int>>()
    GlobalScope.launch {
        try {
            subject.onSuccess(self.foo().asFlowable())
        } catch (ex: Throwable) {
            subject.onError(ex)
        }
    }
    return subject.flatMapPublisher { it }
}

Затем

public class UseFoo {
    public static void main(String[] args) throws Exception {
        SomeSuspendAPIKt.fooRx2(SomeSuspendAPIKt.just(1))
                .blockingSubscribe(System.out::println);
    }
}

Редактировать 2:

Вы можете обобщить это, используя преобразование на стороне Kotlin, которое дает вам объект продолжения для передачи:

fun <T, R: Any> Flow<T>.transformAsync(fn: suspend (t: Flow<T>) -> Flow<R>) : Flowable<R> {
    val self = this
    val subject = SingleSubject.create<Flowable<R>>()
    GlobalScope.launch {
        try {
            val r = fn(self).asFlowable();
            subject.onSuccess(r)
        } catch (ex: Throwable) {
            subject.onError(ex)
        }
    }
    return subject.flatMapPublisher { it }
}
public class UseFoo {
    public static void main(String[] args) throws Exception {

        SomeSuspendAPIKt.transformAsync(
                SomeSuspendAPIKt.just(1),
                (source, cont) -> SomeSuspendAPIKt.foo(source, cont)
        )
        .blockingSubscribe(System.out::println);
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...