Чанк файла в несколько потоков в Scala - PullRequest
0 голосов
/ 07 сентября 2018

В моей программе есть сценарий использования, в котором мне нужно взять файл, разделить их поровну N раз и загрузить их удаленно.

Мне нужна функция, которая принимает, скажем, File и выводит список BufferedReader. На что я могу просто распространять их и отправлять в другую функцию, которая использует некоторый API для их хранения.

Я видел примеры, когда авторы используют .lines() метод BufferedReader:

def splitFile: List[Stream] = {
    val temp = "Test mocked file contents\nTest"
    val is = new ByteArrayInputStream(lolz.getBytes)
    val br = new BufferedReader(new InputStreamReader(is))

    // Chunk the file into two sort-of equal parts.
    // Stream 1
    val test = br.lines().skip(1).limit(1)

    // Stream 2
    val test2 = br.lines().skip(2).limit(1)
    List(test, test2)
}

Полагаю, что приведенный выше пример работает, он не красивый, но работает.

Мои вопросы:

  • Есть ли способ разбить BufferedReader на несколько списков потоков?
  • Я не контролирую формат File, поэтому содержимое файла может быть длиной в одну строку. Разве это не означает, что .lines() просто загрузит все это в Stream одного элемента?

Ответы [ 2 ]

0 голосов
/ 08 сентября 2018

Что ж, если вы не возражаете против чтения всего потока в память, это достаточно просто (при условии, что этот файл содержит текст - поскольку вы говорите о Reader s, но это будет та же идея с двоичным файлом):

 Source.fromFile("filename")
   .mkString
   .getBytes
   .grouped(chunkSize)
   .map { chunk => new BufferedReader(new InputStreamReader(chunk)) }

Но это, похоже, лишает цели цели: если файл достаточно мал для полной загрузки в память, зачем начинать его разделение? Итак, более практичное решение немного сложнее:

  def splitFile(
    input: InputStream, 
    chunkSize: Int
  ): Iterator[InputStream] = new AbstractIterator[InputStream] {
     var hasNext = true
     def next = {
       val buffer = new Array[Byte](chunkSize)
       val bytes = input.read(buffer)
       hasNext = bytes == chunkSize
       new ByteArrayInputStream(buffer, 0, bytes max 0)
     }
  }
0 голосов
/ 08 сентября 2018

Будет намного проще прочитать файл один раз, а затем разделить его.Вы тоже ничего не теряете, так как чтение файла в любом случае является последовательной операцией.Оттуда, просто разработайте схему, чтобы нарезать список.В этом случае я группирую все по его индексу по модулю количества желаемых выходных списков, а затем вытаскиваю списки.Если строк меньше, чем вы просите, каждая строка будет помещена в отдельный List.

val lines: List[String] = br.lines
val outputListQuantity: Int = 2
val data: List[List[String]] = lines.zipWithIndex.groupBy(_._2 % outputListQuantity}.
                                        values.map(_.map(_._1)).toList
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...