Приостановленная функция для чтения из InputStream - PullRequest
0 голосов
/ 10 января 2019

Я довольно новичок в сопрограммах, поэтому я хотел спросить мнение.

Я создал функцию расширения для чтения данных из InputStream:

suspend fun InputStream.readData(): ByteArray {
    return withContext(Dispatchers.IO) {
        while (available() == 0) {
            delay(10)
        }
        val count = available()
        val buffer = ByteArray(count)
        read(buffer, 0, count)
        return@withContext buffer
    }
}

Как вы думаете, могу ли я что-то улучшить с точки зрения сопрограмм?

Ответы [ 3 ]

0 голосов
/ 10 января 2019
while (available() == 0) {
    delay(10)
}

Здесь вы надеетесь, что достигли неблокирующего ввода-вывода с помощью InputStream. Вы представляете, что данные будут каким-то образом «накапливаться» сами по себе, и вы можете просто подождать, пока они станут доступными, чтобы вы могли получить их, не блокируя при последующем вызове read().

Это поведение не универсально для любого InputStream. На самом деле, он, вероятно, работает только с SocketInputStream и там также есть проблемы: когда удаленный конец закрыл соединение, он будет возвращать 0, пока вы не сделаете еще один вызов read, чтобы убедиться, что сокет закрыт .

В других реализациях InputStream, available() всегда будет возвращать 0, если поток не буферизован, в этом случае он просто скажет вам, сколько осталось в буфере. Когда буфер пуст, реализация входного потока не будет пытаться получить больше данных из базового ресурса, пока вы не вызовете read().

Поэтому я бы предложил по крайней мере сузить приемник вашей функции до SocketInputStream, но для полной корректности вы должны использовать вместо этого код NIO.

Наконец, если вы обнаружите, что для вашего конкретного случая использования цикл available() работает должным образом и read() никогда не блокируется, то вам следует отбросить withContext(IO), поскольку это подразумевает два дорогостоящих переключения контекста (в фоновый поток и обратно), и его целью является только запуск блокирования кода из потока GUI.

0 голосов
/ 11 января 2019

В дополнение к ответу Марко я хотел бы отметить, что тот факт, что вы не можете превратить свой блокирующий код в неблокирующий, просто используя сопрограммы, не означает, что вы не должны использовать сопрограммы вообще. Имеет смысл использовать их для получения других преимуществ:

  1. Позволяет сохранить последовательный стиль для асинхронного кода. Если у вас есть несколько шагов для выполнения всей задачи, вам не нужно использовать специальные реактивные типы и их объединители.
  2. Обеспечивает отличный способ масштабирования выполнения задач. В случае нескольких задач вы можете выполнять их конструктивно в нескольких контекстах и ​​разными диспетчерами.

Надеюсь, это поможет понять всю картину.

0 голосов
/ 10 января 2019

Ваш код выглядит нормально с точки зрения сопрограмм, улучшать нечего. Просто вызовите функцию из конструктора сопрограмм: launch - если вы хотите параллелизм или async - если вы хотите параллелизм. Например:

yourScope.launch {

    val inputStream = BufferedInputStream(FileInputStream("filename"))
    val result = inputStream.use {
        it.readData()
    }

    // use ByteArray result
}

Дополнительно вы можете немного уменьшить свой код, заменив return@withContext buffer на buffer и переместив withContext(Dispatchers.IO) из блока функции:

suspend fun InputStream.readData(): ByteArray = withContext(Dispatchers.IO) {
    while (available() == 0) {
        delay(10)
    }
    val count = available()
    val buffer = ByteArray(count)
    read(buffer, 0, count)
    buffer
}
...