Обработка многопоточных вычислений большого массива - PullRequest
1 голос
/ 21 июня 2019

У меня большой массив, и я должен выполнять тяжелую работу ЦП для каждого элемента этого массива.

Исходя из моего аналогичного вопроса , г-н Алексей Кайгородов предполагает, что наилучшим способом является разделение вычислений на каждый кусок данных в каждом отдельном потоке.

Вот моя реализация алгоритма, использующая сопрограммы Котлина:

suspend fun predictAll(movingVehicles: List<MovingVehicle>): List<MovingVehicle?> {
    val prevTime = Timestamp(Date().time)
    val nextTime = Timestamp(Date().time)
    val ctx = Dispatchers.Default
    val processors = Runtime.getRuntime().availableProcessors()
    val chunks = movingVehicles.chunked(movingVehicles.count() / processors)
    val s = coroutineScope {
        val res = mutableListOf<Deferred<List<MovingVehicle?>>>()
        for (c in chunks) {
            val r = async(ctx) {
                c.map { predictLocation(it, prevTime, nextTime) }
            }
            res.add(r)
        }
        res.awaitAll()
    }
    return s.flatten()
}

private fun predictLocation(
    mv: MovingVehicle,
    prevTime: Timestamp,
    nextTime: Timestamp,
    relevance: Int = 5
): MovingVehicle?

Это работает, но, может быть, есть лучший подход? Я ищу ExecutorService, но похоже, что ему нужно больше стандартного кода, чем сопрограмм.

1 Ответ

2 голосов
/ 21 июня 2019

Это на самом деле котилинский способ использования сопрограмм. Вы отправляете асинхронные задачи, которые могут выполняться одновременно, а затем ждете их завершения.

Одна пища для размышлений. Все выполнено в потоке. Это означает, что сопрограммы также выполняются в потоке, и если ваша задача блокирует , поток будет заблокирован . Сопрограммы там не спасут. Поэтому часто хорошей идеей является создание Threadpool со свойствами, которые лучше всего подходят для приложения (механика противодавления, количество мин / макс нитей и т. Д.)

Теперь в вашем случае у вас есть задачи, связанные с процессором, вы не можете добиться большей производительности, имея большое количество потоков. Для таких задач практическое применение Amdahl's_law дает -

#threads = #cpu-cores - 1

Сопрограммы по умолчанию поддерживаются общим пулом , который является таким же числом потоков, как упомянуто выше, поэтому, кажется, хорошо сохранить настройки по умолчанию.

Однако несколько библиотек могут использовать этот пул, и если у вас есть задача блокировки ввода-вывода в любой из них, вы потеряете производительность. Я бы порекомендовал создать свой собственный ForkJoinPool и использовать его в качестве диспетчера

val nOfThreads = Runtime.getRuntime().availableProcessors() - 1;
val ctx = ForkJoinPool( if (nOfThreads == 0) then 1 else nOfThreads).asCoroutineDispatcher()
...