Чтение и копирование файла с помощью сопрограмм - PullRequest
1 голос
/ 20 июня 2020

Я создал следующее приложение, чтобы проиллюстрировать некоторые сомнения. Мой пример на Github

В этом примере я копирую файл в другой пакет.

Мои сомнения следующие:

  1. Выполняя задачи параллельно, можно ли вернуть значения, которые были выполнены до отмены?

  2. Почему в contentResolver.openInputStream (uri) появляется сообщение «Неуместное» вызов метода блокировки », пока я работаю с контекстом ввода-вывода?

  3. Пока я читаю запись файла, которую нужно скопировать на вывод, я всегда проверяю статус задания, чтобы при выполнении этой задачи отменен, он немедленно останавливается, выходной файл, который был создан, удаляется и возвращает исключение отмены, это правильно?

  4. Могу ли я ограничить количество выполняемых задач?

My onCreate:

private val listUri = mutableListOf<Uri>()
private val job = Job()

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)
    
    //get files from 1 to 40
    val packageName = "android.resource://${packageName}/raw/"
    for (i in 1..40) {
        listUri.add(Uri.parse("${packageName}file$i"))
    }
}

Действие моей кнопки:

  //Button action
   fun onClickStartTask(view: View) {
        var listNewPath = emptyList<String>()
        CoroutineScope(Main + job).launch {
            try {
                //shows something in the UI - progressBar
                withContext(IO) {
                    listNewPath = listUri.map { uri ->
                        async {
                            //path to file temp
                            val pathFileTemp =
                                "${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
                            val file = File(pathFileTemp)
                            val inputStream = contentResolver.openInputStream(uri)
                            inputStream?.use { input ->
                                FileOutputStream(file).use { output ->
                                    val buffer = ByteArray(1024)
                                    var read: Int = input.read(buffer)
                                    while (read != -1) {
                                        if (isActive) {
                                            output.write(buffer, 0, read)
                                            read = input.read(buffer)
                                        } else {
                                            input.close()
                                            output.close()
                                            file.deleteRecursively()
                                            throw CancellationException()
                                        }
                                    }
                                }
                            }
                            //If completed then it returns the new path.
                            return@async pathFileTemp
                        }
                    }.awaitAll()
                }
            } finally {
                //shows list complete in the UI
            }
        }
    }

Моя кнопка для отмены задания:

fun onClickCancelTask(view: View) {
    if (job.isActive) {
        job.cancelChildren()
        println("Cancel children")
    }
}

Это будет действие кнопки для выполнения задачи.

Благодарю всех за помощь.

Ответы [ 2 ]

1 голос
/ 22 июня 2020

Ответы 1. и 4.:

Чтобы разграничить параллельные задачи и дать им возможность завершиться sh независимо (получение некоторых значений при отмене остальных), вам потребуется использовать канал и, желательно, Поток. Упрощенный пример:

fun processListWithSomeWorkers(list: List<Whatever>, concurrency: Int): Flow<Result> = channelFlow {
   val workToDistribute = Channel<Whatever>()
   launch { for(item in list) workToDistribute.send(item) } // one coroutine distributes work...

    repeat(concurrency) { // launch a specified number of worker coroutines
      launch { 
         for (task in workToDistribute) { // which process tasks in a loop
            val atomicResult = process(task)
            send(atomicResult) // and send results downstream to a Flow
         }
      }
   }
}

И затем вы можете обрабатывать результаты один за другим, поскольку они создаются, ожидая, пока весь поток достигнет sh или, например, просто возьмите некоторые из них, когда это необходимо: resultFlow.take(20).onEach { ... }.collectIn(someScope) Поскольку это Поток, он начнет работать только тогда, когда кто-нибудь начнет собирать (он холодный), что обычно хорошо.

Целое можно сделать немного короче, так как вы обнаружите некоторые более подробные c и экспериментальные функции (как производят). Его можно обобщить как оператор Flow следующим образом:

fun <T, R> Flow<T>.concurrentMap(concurrency: Int, transform: suspend (T) -> R): Flow<R> {
    require(concurrency > 1) { "No sense with concurrency < 2" }
    return channelFlow {
        val inputChannel = produceIn(this)
        repeat(concurrency) {
            launch {
                for (input in inputChannel) send(transform(input))
            }
        }
    }
}

и использовать: list.asFlow().concurrentMap(concurrency = 4) { <your mapping logic> }

Команда corotuines думает о добавлении семейства параллельных операторов в потоки Flow, но их еще нет, AFAIK.

0 голосов
/ 22 июня 2020

Думаю, так лучше

fun onClickStartTask(view: View) {
    var listNewPath = emptyList<String>()
    val copiedFiles = mutableListOf<File>()
    CoroutineScope(Dispatchers.Main + job).launch {
        try {
            //shows something in the UI - progressBar
            withContext(Dispatchers.IO) {
                listNewPath = listUri.map { uri ->
                    async {
                        //path to file temp
                        val pathFileTemp =
                                "${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
                        val file = File(pathFileTemp)
                        val inputStream = contentResolver.openInputStream(uri)
                        inputStream?.use { input ->
                            file.outputStream().use { output ->
                                copiedFiles.add(file)
                                input.copyTo(output, 1024)
                            }
                        }

                        //If completed then it returns the new path.
                        return@async pathFileTemp
                    }
                }.awaitAll()
            }
        } finally {
            //shows list complete in the UI
        }
    }
    job.invokeOnCompletion {
        it?.takeIf { it is CancellationException }?.let {
            GlobalScope.launch(Dispatchers.IO) {
                copiedFiles.forEach { file ->
                    file.delete()
                }
            }
        }
    }
}
...