Сопрограмма: отложенные операции в списке выполняются последовательно. - PullRequest
0 голосов
/ 10 ноября 2018

У меня есть List параметров для выполнения загрузки. Я отображаю элементы этого списка в Deferred, которые выполняют загрузку; затем, forEach элемент списка, я называю await, но, очевидно, загрузки выполняются последовательно.

Это моя функция:

suspend fun syncFiles() = coroutineScope {
    remoteRepository.requiredFiles()
        .filter { localRepository.needToDownload( it.name, it.md5 ) }
        .map { async { downloader( it ) } }
        .forEach { deferredResult ->
​
            when ( val result = deferredResult.await() ) {
                is DownloadResult.Layout ->  localRepository.storeLayout( result.content )
                is DownloadResult.StringR -> localRepository.storeFile( result )
            }
        }
}

Это мой тест:

private val useCase = SyncUseCaseImpl.Factory(
        mockk { // downloader
            coEvery { this@mockk.invoke( any() ) } coAnswers { delay(1000 );any() }
        },
        ...
    ).newInstance()
​
@Test
fun `syncFiles downloadConcurrently`() = runBlocking {
    val requiredFilesCount = useCase.remoteRepository.requiredFiles().size
    assert( requiredFilesCount ).isEqualTo( 3 )
​
    val time = measureTimeMillis {
        useCase.syncFiles()
    }
​
    assert( time ).isBetween( 1000, 1100 )
}

И вот мой результат: expected to be between:<1000L> and <1100L> but was:<3081L>

Я думаю, что это странно, потому что эти 2 фиктивных теста завершены правильно, может быть, я что-то упустил (?)

@Test // OK
fun test() = runBlocking {
    val a = async { delay(1000 ) }
    val b = async { delay(1000 ) }
    val c = async { delay(1000 ) } ​
    val time = measureTimeMillis {
        a.await()
        b.await()
        c.await()
    } ​
    assert( time ).isBetween( 1000, 1100 )
} ​

@Test // OK
fun test() = runBlocking {
    val wasteTime: suspend () -> Unit = { delay(1000 ) }
    suspend fun wasteTimeConcurrently() = listOf( wasteTime, wasteTime, wasteTime )
            .map { async { it() } }
            .forEach { it.await() } ​
    val time = measureTimeMillis {
        wasteTimeConcurrently()
    } ​
    assert( time ).isBetween( 1000, 1100 )
}

Ответы [ 2 ]

0 голосов
/ 11 ноября 2018

Это может произойти, если задание блокирует весь поток, например, задачи, связанные с вводом-выводом, которые блокируют выполнение всего потока, таким образом блокируя все другие сопрограммы в этом потоке. Если вы используете Kotlin JVM, попробуйте вызвать async(IO) { }, чтобы запустить программу в диспетчере ввода-вывода, чтобы среда программы теперь знала, что это задание заблокирует весь поток и будет вести себя соответствующим образом.

Посмотрите здесь других диспетчеров: https://kotlinlang.org/docs/reference/coroutines/coroutine-context-and-dispatchers.html#dispatchers-and-threads

0 голосов
/ 10 ноября 2018

Проблема в mockk

Если вы посмотрите на код функции coAnswer, вы найдете это (API.kt + InternalPlatformDsl.kt):

infix fun coAnswers(answer: suspend MockKAnswerScope<T, B>.(Call) -> T) = answers {
    InternalPlatformDsl.runCoroutine {
        answer(it)
    }
}

И runCoroutine выглядит следующим образом.

actual fun <T> runCoroutine(block: suspend () -> T): T {
    return runBlocking {
        block()
    }
}

Как видите, coAnswer не является функцией приостановки и запускает новую сопрограмму с runBlocking.

Давайте рассмотрим пример:

val mock =  mockk<Downloader> {
    coEvery {
        this@mockk.download()
    } coAnswers {
        delay(1000)
    }
}

val a = async {
    mock.download()
}

Когда mockk выполняет coAnswer -блок (delay()), он запускает искусственную область сопрограммы, выполняет данный блок и ждет(блокирует текущий поток: runBlocking), пока этот блок не закончится.Таким образом, блок ответа возвращается только после завершения delay(1000).

Означает, что все сопрограммы, запущенные с coAnswer, выполняются последовательно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...