Как изменить мою вспомогательную функцию, чтобы она собирала результаты задач параллельной обработки - PullRequest
0 голосов
/ 06 апреля 2020

Я написал эту вспомогательную функцию, чтобы я мог легко обрабатывать список параллельно и продолжать выполнение кода только после завершения всей работы. Это прекрасно работает, когда вам не нужно возвращать результат.

(я знаю, что не всегда рекомендуется создавать новые пулы, его можно легко удалить, но я хотел чтобы примеры были простыми.)

fun recursiveAction(action: () -> Unit): RecursiveAction {
    return object : RecursiveAction() {
        override fun compute() {
            action()
        }
    }
}

fun <T> List<T>.parallelForEach(parallelSize: Int, action: (T) -> Unit) {
    ForkJoinPool(parallelSize).invoke(recursiveAction {
        this.parallelStream().forEach { action(it) }
    })
}

Пример использования:

val myList: List<SomeClass> [...]
val parallelSize: Int = 8

myList.parallelForEach(parallelSize) { listElement ->
   //Some task here
}

Есть ли способ создать аналогичную вспомогательную конструкцию для случаев, когда вы хотите собрать результаты обратно в список?

Я знаю, что должен использовать RecursiveTask вместо RecursiveAction, но мне не удалось написать вспомогательную функцию, как я делал выше, чтобы обернуть ее.

Я хотел бы использовать это так:

val myList: List<SomeClass> [...]
val parallelSize: Int = 8

val result: List<SomeClass> = myList.parallelForEach(parallelSize) { listElement ->
   //Some task here
}

В качестве альтернативы, есть ли более простой способ сделать это вообще?

1 Ответ

0 голосов
/ 08 апреля 2020

Ответ JeffMurdock более Reddit

fun <T> recursiveTask(action: () -> T): RecursiveTask<T> {
    return object : RecursiveTask<T>() {
        override fun compute(): T {
            return action()
        }
    }
}

fun <T, E> List<T>.parallelForEach(parallelSize: Int, action: (T) -> E): List<E> {
    val pool = ForkJoinPool(parallelSize)
    val result = mutableListOf<ForkJoinTask<E>>()
    for (item in this) {
        result.add(pool.submit(recursiveTask {
            action(item)
        }))
    }
    return result.map { it.join() }
}

fun main(args: Array<String>) {
    val list = listOf(1, 2, 3)
    list.parallelForEach(3) { it + 2 }.forEach { println(it) }
}
...