Как я могу обобщить арность функции zip rxjava2 (от Single / Observable) до n аргументов, допускающих значение NULL, без потери ее типов? - PullRequest
7 голосов
/ 11 июля 2020

Две основные проблемы, которые необходимо решить:

1) Проверка типа потеряна

Используя аргумент массива Single.zip() версия, я теряю строго типизированные аргументы.

2) Исходный аргумент не может быть обнуляемым

Я не могу отправлять обнуляемые исходные значения в качестве аргумента Single.zip() функции

3) Мне нужна альтернатива методу, принимающему Object[] не типизированный:

public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R> zipper, SingleSource<? extends T>... sources) ...

В haskell есть вопрос, связанный с Как я могу реализовать обобщенные «zipn» и «unzipn» в Haskell? :

И в haskell Я могу добиться этого с помощью аппликативных функторов:

f <$> a1 <*> a2 <*> a3 <*> a4 <*> a5 <*> a6 <*> a7 <*> a8 <*> a9 <*> a10 <*> a11

f :: Int -> Int -> Int -> Int -> Int -> Int -> Int -> String -> String -> String -> Int

и a1 .. a11 значения, соответствующие каждому типу

Список похожих функций есть в библиотека:

  • С двумя аргументами:

     public static <T1, T2, R> Single<R> zip(SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,BiFunction<? super T1, ? super T2, ? extends R> zipper) {
         ObjectHelper.requireNonNull(source1, "source1 is null");
         ObjectHelper.requireNonNull(source2, "source2 is null");
         return zipArray(Functions.toFunction(zipper), source1, source2);
     }
    
  • с тремя:

      public static <T1, T2, T3, R> Single<R> zip(
          SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
          SingleSource<? extends T3> source3,
          Function3<? super T1, ? super T2, ? super T3, ? extends R> zipper)
    

И так далее ...

Во всех этих случаях это нормально, потому что каждый аргумент типизирован. Но есть ограничение до 9 одиночных источников

В нашем проекте нам нужно больше источников, потому что у нас есть много сервисов, которые мы хотим достичь asyn c (в нашем случае было 11 аргументов).

Но проблема в том, что аргументы теряют свои сильные типы, и, что еще хуже, некоторые из них могут быть Nullable

Например, мы хотели решить эту проблему вариант использования:

//Given
val bothSubscribed = CountDownLatch(2) // Change this value to 0 to run the test faster
val subscribeThreadsStillRunning = CountDownLatch(1) // Change this value to 0 to run the test faster

val service = { s1: String,
                s2: Int,
                s3: String?,
                s4: Int,
                s5: String,
                s6: String,
                s7: String,
                s8: String,
                s9: String,
                s10: String?,
                s11: String ->
    val result =
        listOf(s1, "$s2", s3 ?: "none", "$s4", s5, s6, s7, s8, s9, s10 ?: "none", s11).joinToString(separator = ";")
    Single.just("Values:$result")
}

val createSingle = { value: String ->
    Observable
        .create<String> { emitter ->
            println("Parallel subscribe $value on ${Thread.currentThread().name}")
            bothSubscribed.countDown()
            subscribeThreadsStillRunning.await(20, TimeUnit.SECONDS)
            emitter.onNext(value)
            emitter.onComplete()
        }
        .singleOrError()
        .subscribeOn(io())
}

val s1 = createSingle("v1")
val s2 = Single.just(2)
val s3 = null
val s4 = Single.just(4)
val s5 = createSingle("v5")
val s6 = createSingle("v6")
val s7 = createSingle("v7")
val s8 = createSingle("v8")
val s9 = createSingle("v9")
val s10 = null
val s11 = createSingle("v11")

//When

 val result = Single.zipArray(
    listOf(
        s1,
        s2,
        s3,
        s4,
        s5,
        s6,
        s7,
        s8,
        s9,
        s10,
        s11
    )
) { arrayResult ->
    service(
        arrayResult[0] as String,
        arrayResult[1] as String,
        arrayResult[2] as String?,
        arrayResult[3] as String,
        arrayResult[4] as String,
        arrayResult[5] as String,
        arrayResult[6] as String,
        arrayResult[7] as String,
        arrayResult[8] as String,
        arrayResult[9] as String?,
        arrayResult[10] as String
    )
}

//Then
result
    .test()
    .awaitDone(50, TimeUnit.SECONDS)
    .assertSubscribed()
    .assertValues("Values:v1;2;none;4;v5;v6;v7;v8;v9;none;v11")

Как видите, могут возникнуть проблемы, если я это сделаю, например:

arrayResult[0] as String,
arrayResult[1] as Int,
arrayResult[2] as String?,
arrayResult[3] as Int,
arrayResult[4] as String,
arrayResult[5] as String,
arrayResult[6] as String,
arrayResult[7] as String,
arrayResult[8] as String,
arrayResult[9] as String?,
arrayResult[10] as String

Ошибка, потому что:

1) Ни один из * Функции 1064 * могут принимать в качестве аргумента значение, допускающее значение NULL.

2) Вы можете изменить в массиве порядок значений, и это может привести к сбою из-за проверки типов

Ответы [ 2 ]

2 голосов
/ 14 июля 2020

Функция с одиннадцатью параметрами - хороший пример нечистого кода. Вместо этого вам следует подумать о создании модели, отвечающей вашим потребностям. Таким же образом вы можете указать значимые имена для каждого аргумента.

data class MyObject(...)

class MyMutableObject {
    private lateinit var param0: String
    private var param1: Int
    ...

    fun setParam0(value: String) {
        param0 = value
    }
    fun setParam1(value: Int) {
        param1 = value
    }
    ...

    fun toMyObject() = MyObject(
        param0,
        param1,
        ...
    ) 
}

Имея эту модель, вы можете просто использовать оператор zipWith() для каждого из ваших источников.

Single.just(MyMutableObject())
      .zipWith(source0, MyMutableObject::setParam0)
      .zipWith(source1, MyMutableObject::setParam1)
      ...
      .map(MyMutableObject::toMyObject)

Если вы рассматриваете возможность абстрагирования обнуляемости как Maybe, вы можете просто определить функцию расширения, получающую Maybe с данными или без данных, и сопоставить ее соответствующим образом.

inline fun <T, U, R> Single<T>.zipWith(
        other: MaybeSource<U>,
        crossinline zipper: (T, U) -> R
) = other.zipWith(toMaybe()) { t, u -> zipper(t, u) }
         .switchIfEmpty(this)
0 голосов
/ 11 июля 2020

Я достиг этой цели, используя:

  1. Kotlin Функции расширения
  2. Каррированные функции (Kotlin позволяет это)
  3. Частичное приложение (Kotlin это тоже позволяет) * function, для значений, не допускающих NULL:
    /**
     * Returns a Single that is the result of applying the function inside the context (a Single in this case).
     * This function is curried and will be used as an Applicative Functor, so each argument will be given
     * one by one
     * @param <B> the result value type
     * @param applicativeValue
     *            a Single that contains the input value of the function
     * @return the Single returned when the function is applied to the applicative value.
     * Each application will be executed on <b>a new thread</b> if and only if the Single is subscribed on a specific scheduler
     */
    infix fun <A, B> Single<(A) -> (B)>.zipOver(applicativeValue: Single<A>): Single<B> =
        Single.zip(this, applicativeValue, BiFunction { f, a -> f(a) })
    

    Затем, zipOverNullable для значений, допускающих NULL:

    /**
     * Returns a Single that is the result of applying the function inside the context (a Single in this case).
     * This function is curried and will be used as an Applicative Functor, so each argument will be given
     * one by one
     * @param <B> the result value type
     * @param applicativeValue
     *            a Single that contains the input value of the function and it can be null
     * @return the Single returned when the function is applied to the applicative value even when
     * it is null.
     * Each application will be executed on <b>a new thread</b> if and only if the Single is subscribed on a specific scheduler
     */
    infix fun <A, B> Single<(A?) -> (B)>.zipOverNullable(applicativeValue: Single<A>?): Single<B> =
        when {
            applicativeValue != null -> Single.zip(this, applicativeValue, BiFunction { f, a -> f(a) })
            else -> this.map { it(null) }
        }
    

    Я использовал org.funktionale.currying для curried() функция

    Объединив эти два, вы можете написать:

        //Given
        val bothSubscribed = CountDownLatch(0) // Change this value to 2 to run the test slowly
        val subscribeThreadsStillRunning = CountDownLatch(0) // Change this value to 1 to run the test slowly
    
        val service: (String, String, String?, String, String, String, String, String, String, String?, String) -> Single<String> = { 
                        s1: String,
                        s2: Int,
                        s3: String?,
                        s4: Int,
                        s5: String,
                        s6: String,
                        s7: String,
                        s8: String,
                        s9: String,
                        s10: String?,
                        s11: String ->
            val result =
                listOf(s1, "$s2", s3 ?: "none", "$s4", s5, s6, s7, s8, s9, s10 ?: "none", s11).joinToString(separator = ";")
            Single.just("Values:$result")
        }
    
        val createSingle = { value: String ->
            Observable
                .create<String> { emitter ->
                    println("Parallel subscribe $value on ${Thread.currentThread().name}")
                    bothSubscribed.countDown()
                    subscribeThreadsStillRunning.await(20, TimeUnit.SECONDS)
                    emitter.onNext(value)
                    emitter.onComplete()
                }
                .singleOrError()
                .subscribeOn(io())
        }
    
        val s1: Single<String> = createSingle("v1")
        val s2: Single<Int> = Single.just(2)
        // Here, we move the Nullable value outside, so the whole Single<String> is Nullable, and not the value inside the Single`enter code here`
        val s3: Single<String>? = null
        val s4: Single<String> = Single.just(4)
        val s5: Single<String> = createSingle("v5")
        val s6: Single<String> = createSingle("v6")
        val s7: Single<String> = createSingle("v7")
        val s8: Single<String> = createSingle("v8")
        val s9: Single<String> = createSingle("v9")
        val s10: Single<String>? = null
        val s11 = createSingle("v11")
    
        //When
        // Here I curry the function, so I can apply one by one the the arguments via zipOver() and preserve the types 
    
        val singleFunction: Single<(String) -> (String) -> (String?) -> (String) -> (String) -> (String) -> (String) -> (String) -> (String) -> (String?) -> (String) -> Single<String>> =
            Single.just(service.curried()).subscribeOn(io())
    
        val result = singleFunction
            .zipOver(s1)
            .zipOver(s2)
            .zipOverNullable(s3)
            .zipOver(s4)
            .zipOver(s5)
            .zipOver(s6)
            .zipOver(s7)
            .zipOver(s8)
            .zipOver(s9)
            .zipOverNullable(s10)
            .zipOver(s11)
            .flatMap { it }
    
        //Then
        result
            .test()
            .awaitDone(50, TimeUnit.SECONDS)
            .assertSubscribed()
            .assertValues("Values:v1;2;none;4;v5;v6;v7;v8;v9;none;v11")
    

    Затем он напечатает что-то вроде:

    Parallel subscribe v11 on RxCachedThreadScheduler-10
    Parallel subscribe v8 on RxCachedThreadScheduler-8
    Parallel subscribe 4 on RxCachedThreadScheduler-4
    Parallel subscribe v5 on RxCachedThreadScheduler-5
    Parallel subscribe v9 on RxCachedThreadScheduler-9
    Parallel subscribe 2 on RxCachedThreadScheduler-3
    Parallel subscribe v6 on RxCachedThreadScheduler-6
    Parallel subscribe v1 on RxCachedThreadScheduler-2
    Parallel subscribe v7 on RxCachedThreadScheduler-7
    

    Теперь, если я это сделаю:

        val result = singleFunction
            .zipOver(s1)
            .zipOver(s1)
            .zipOverNullable(s3)
            .zipOver(s1)
            .zipOver(s5)
            .zipOver(s6)
            .zipOver(s7)
            .zipOver(s8)
            .zipOver(s9)
            .zipOverNullable(s10)
            .zipOver(s11)
            .flatMap { it }
    

    Он сломается во время компиляции

...