Android & Kotlin Coroutines: возможно ли исчерпать темы? - PullRequest
1 голос
/ 13 апреля 2019

Как узнать, нет ли у меня потоков в Android / Kotlin? Я создаю приложение, в котором мне нужно загрузить много данных из удаленного API. Я добавляю логи в свой код, чтобы проверить имя потока, и вижу как минимум 5 рабочих, работающих параллельно. В приложении предусмотрена функция смахивания до обновления, и, если я проведу слишком сильно, после определенного количества вызовов я каким-то образом теряю данные (хотя я не получаю сообщение об ошибке от сервера). Я заметил, что интересующий меня вызов начинается с одного работника, а затем этот работник занят другим процессом. Тогда метод никогда не завершается. Я немного озадачен. Пожалуйста, помогите с любыми предложениями, как решить проблемы многопоточности. Изменение Dispatcher.IO на Dispatcher.Default не имеет большой разницы в поведении. Я могу помещать все сетевые вызовы один за другим (последовательно) - тогда я никогда не потеряю данные, даже если проведу пальцем, чтобы обновить 100 раз. Но тогда все вызовы выполняются в одном и том же рабочем потоке, и я не пользуюсь параллелизмом. : - /

1 Ответ

1 голос
/ 14 апреля 2019

TL; DR: можно ли исчерпать потоки при использовании сопрограмм? Что ж, ответ - нет (тупики - другая проблема).Но возможно ли использовать сопрограммы таким образом, чтобы это означало, что ваш параллелизм ограничен числом потоков?Да.

Я думаю, что первое, что вы должны понять, это разница между блокирующей и неблокирующей / приостанавливающей / асинхронной функциями.

Реальная функция приостановки / неблокирования / асинхронности, которая имеет некоторые длительные функциональные возможности, но должным образом обеспечивает контроль выполнения до тех пор, пока эта длительная задача не будет завершена, - это то, как вы действительно используете параллелизм, который вы получаете с сопрограммами.Позвольте мне продемонстрировать.

Несколько сопрограмм с внутренней долго работающей функцией приостановки в 1 потоке

val singleThread = Executors.newFixedThreadPool(1).asCoroutineDispatcher()

fun main() = runBlocking {
    val start = System.currentTimeMillis()
    val jobs = List(10) {
        launch (singleThread){
            delay(1000)
            print(".")
        }
    }
    jobs.forEach { it.join() }
    val end = System.currentTimeMillis()
    println()
    println(end-start)
}

Здесь у нас есть 10 сопрограмм, которые были запущены в быстрой последовательности1 поток.Все они используют функцию приостановки delay для имитации длительной задачи, которая занимает 1000 миллисекунд.Но ... все заканчивается за 1018 миллисекунд.Это будет немного странно для тех, кто знаком с чисто параллельным параллелизмом.Объяснение впереди.Но просто для ясности, здесь тот же код, но с использованием Thread.sleep вместо delay.

Несколько сопрограмм в 1 потоке с внутренней функцией долгосрочного блокирования

fun main() = runBlocking {
    val start = System.currentTimeMillis()
    val jobs = List(10) {
        launch (singleThread){
            Thread.sleep(1000)
            print(".")
        }
    }
    jobs.forEach { it.join() }
    val end = System.currentTimeMillis()
    println()
    println(end-start)
}

Этот же бит кода, но с блокировкой Thread.sleep занял 10027 миллисекунд.Каждая сопрограмма блокировала поток, в котором она была, и поэтому наши 10 сопрограмм фактически выполнялись последовательно.Диспетчеру не было возвращено управление во время выполнения функции длительного выполнения.

Более подробное объяснение различий между неблокирующей приостановкой и блокирующими вызовами от Романа Елизарова здесь

В вашем случае, я подозреваю, что вы осуществляете поиск данных, используя блокирующую библиотеку ввода-вывода.Это означает, что каждый из этих вызовов блокирует поток, в котором он находится, и не передает управление диспетчеру во время выполнения задачи ввода-вывода.

Моя рекомендация:

  • Carryпо использованию Dispatchers.IO
  • Начните использовать неблокирующую библиотеку для извлечения ваших данных.Я рекомендую клиент ktor http с механизмом CIO.

Но как насчет потери данных при одновременной работе?

Тамздесь недостаточно информации, чтобы быть уверенным, но я думаю, что вы не выстроили свою логику таким образом, чтобы учитывать параллелизм.В действительно параллельном выполнении, смахивание номер 3 может завершиться до завершения смахивания 2 или смахивания 1.Если ваши обновления не являются идемпотентными или вы предоставляете некоторый частичный набор данных с каждым запросом на обновление, то вы можете обрабатывать обновление 3 раньше других и игнорировать обновления 1 и 2, когда они в конечном итоге поступят.

...