Как обработать foreach с многопоточностью? - PullRequest
0 голосов
/ 26 марта 2020

Возможно ли обработать мой forEach с многопоточностью? Потому что это немного сложно для чтения файла с 3 миллионами строк. Может быть, у кого-то есть хорошие знания в Kotlin многопоточности. Заранее спасибо

        var records = arrayListOf<myClass>()

        var mapper = CsvMapper()
        mapper.enable(CsvParser.Feature.WRAP_AS_ARRAY)
        var csvFile = File("myCsv.csv")
        val response = mapper.readerFor(Array<String>::class.java).readValues<Array<String>>(csvFile)

        response.forEach { line ->
                records.add(
                        myClass(
                                id = line[0],
                                name = line[1],
                                secondName= line[2],

                        )
                )
            }

        return records

Ответы [ 2 ]

0 голосов
/ 26 марта 2020

Есть довольно много способов добиться этого с помощью сопрограмм.

//Example 1: Suspend Function
suspend fun loadFiles(): List<YourType> {
    var records = arrayListOf<YourType>()
    var mapper = CsvMapper()
    mapper.enable(CsvParser.Feature.WRAP_AS_ARRAY)
    var csvFile = File("myCsv.csv")
    val response = mapper.readerFor().readValues(csvFile)

    response.forEach { line ->
        records.add(
            yourType(
                id = line[0],
                name = line[1],
                secondName = line[2]
            )
        )
    }
    return records
}

//Example 2 Using Flows
suspend fun loadFilesWithFlow(): Flow<YourType> {
    return flow {
        var mapper = CsvMapper()
        mapper.enable(CsvParser.Feature.WRAP_AS_ARRAY)
        var csvFile = File("myCsv.csv")
        val response = mapper.readerFor(csvFile)

        response.forEach { line ->
            emit(YourType(id = line[0], name = line[1], secondName = line[2]))
        }
    }
}

fun main() = runBlocking {
    //You can use the Default Dispatcher or Create you own thread
    /*
    * However the default dispatcher sends the job to a separate thread 
    * that isn't the main thread. By default the max number of thread 
    * available using Default dispatcher is equal to the number of CPU 
    * cores, most likely it's at-least 2 
    */
    //val scope = CoroutineScope(Dispatchers.Default);
    //Example 1
    launch(Dispatchers.Default) {
        val records = loadFiles()
    }
    //Example 2: using Flows with custom CoroutineContext(ThreadPool)
    launch(newFixedThreadPoolContext(3, "my-default-thread")) {
        loadFilesWithFlow().collect { yourType ->
            //Do something with your data here
        }
    }
}

Вы можете попробовать это, поскольку вы все еще новичок в сопрограммах и потоках.

0 голосов
/ 26 марта 2020

Вы можете использовать

Arrays.stream(response).parallel().forEach(...)

Не забудьте. Теперь вы находитесь в режиме многопоточности и добавляете элементы в список. Убедитесь, что записи являются Threadsafe. Если вы еще не используете

Collections.synchronizedList(records);

РЕДАКТИРОВАТЬ:

Еще лучше, вы можете использовать

map(l -> myClass(id = l[0], name = l[1], secondName = l[2])).collect(Collectors.toList());

Тогда вам даже не нужно создавать список. Просто идея ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...