TL; DR: можно ли исчерпать потоки при использовании сопрограмм? Что ж, ответ - нет (тупики - другая проблема).Но возможно ли использовать сопрограммы таким образом, чтобы это означало, что ваш параллелизм ограничен числом потоков?Да.
Я думаю, что первое, что вы должны понять, это разница между блокирующей и неблокирующей / приостанавливающей / асинхронной функциями.
Реальная функция приостановки / неблокирования / асинхронности, которая имеет некоторые длительные функциональные возможности, но должным образом обеспечивает контроль выполнения до тех пор, пока эта длительная задача не будет завершена, - это то, как вы действительно используете параллелизм, который вы получаете с сопрограммами.Позвольте мне продемонстрировать.
Несколько сопрограмм с внутренней долго работающей функцией приостановки в 1 потоке
val singleThread = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
fun main() = runBlocking {
val start = System.currentTimeMillis()
val jobs = List(10) {
launch (singleThread){
delay(1000)
print(".")
}
}
jobs.forEach { it.join() }
val end = System.currentTimeMillis()
println()
println(end-start)
}
Здесь у нас есть 10 сопрограмм, которые были запущены в быстрой последовательности1 поток.Все они используют функцию приостановки delay
для имитации длительной задачи, которая занимает 1000 миллисекунд.Но ... все заканчивается за 1018 миллисекунд.Это будет немного странно для тех, кто знаком с чисто параллельным параллелизмом.Объяснение впереди.Но просто для ясности, здесь тот же код, но с использованием Thread.sleep
вместо delay
.
Несколько сопрограмм в 1 потоке с внутренней функцией долгосрочного блокирования
fun main() = runBlocking {
val start = System.currentTimeMillis()
val jobs = List(10) {
launch (singleThread){
Thread.sleep(1000)
print(".")
}
}
jobs.forEach { it.join() }
val end = System.currentTimeMillis()
println()
println(end-start)
}
Этот же бит кода, но с блокировкой Thread.sleep
занял 10027 миллисекунд.Каждая сопрограмма блокировала поток, в котором она была, и поэтому наши 10 сопрограмм фактически выполнялись последовательно.Диспетчеру не было возвращено управление во время выполнения функции длительного выполнения.
Более подробное объяснение различий между неблокирующей приостановкой и блокирующими вызовами от Романа Елизарова здесь
В вашем случае, я подозреваю, что вы осуществляете поиск данных, используя блокирующую библиотеку ввода-вывода.Это означает, что каждый из этих вызовов блокирует поток, в котором он находится, и не передает управление диспетчеру во время выполнения задачи ввода-вывода.
Моя рекомендация:
- Carryпо использованию Dispatchers.IO
- Начните использовать неблокирующую библиотеку для извлечения ваших данных.Я рекомендую клиент ktor http с механизмом CIO.
Но как насчет потери данных при одновременной работе?
Тамздесь недостаточно информации, чтобы быть уверенным, но я думаю, что вы не выстроили свою логику таким образом, чтобы учитывать параллелизм.В действительно параллельном выполнении, смахивание номер 3 может завершиться до завершения смахивания 2 или смахивания 1.Если ваши обновления не являются идемпотентными или вы предоставляете некоторый частичный набор данных с каждым запросом на обновление, то вы можете обрабатывать обновление 3 раньше других и игнорировать обновления 1 и 2, когда они в конечном итоге поступят.