Очередь приоритетов scala неправильно упорядочена? - PullRequest
4 голосов
/ 16 октября 2011

Я вижу странное поведение с collection.mutable.PriorityQueue у Скалы. Я выполняю внешнюю сортировку и тестирую ее с 1М записями. Каждый раз, когда я запускаю тест и проверяю, результаты между 10-20 записями не сортируются должным образом. Я заменяю реализацию scala PriorityQueue на java.util.PriorityQueue, и она работает 100% времени. Есть идеи?

Вот код (извините, он немного длинный ...). Я тестирую его, используя инструменты gensort -a 1000000 и valsort из http://sortbenchmark.org/

def externalSort(inFileName: String, outFileName: String)
    (implicit ord: Ordering[String]): Int = {

  val MaxTempFiles = 1024
  val TempBufferSize = 4096

  val inFile = new java.io.File(inFileName)

  /** Partitions input file and sorts each partition */
  def partitionAndSort()(implicit ord: Ordering[String]):
      List[java.io.File] = {

    /** Gets block size to use */
    def getBlockSize: Long = {
      var blockSize = inFile.length / MaxTempFiles
      val freeMem = Runtime.getRuntime().freeMemory()
      if (blockSize < freeMem / 2)
        blockSize = freeMem / 2
      else if (blockSize >= freeMem)
        System.err.println("Not enough free memory to use external sort.")
      blockSize
    }

    /** Sorts and writes data to temp files */
    def writeSorted(buf: List[String]): java.io.File = {
      // Create new temp buffer
      val tmp = java.io.File.createTempFile("external", "sort")
      tmp.deleteOnExit()

      // Sort buffer and write it out to tmp file
      val out = new java.io.PrintWriter(tmp)
      try {
        for (l <- buf.sorted) {
          out.println(l)
        }
      } finally {
        out.close()
      }

      tmp
    }

    val blockSize = getBlockSize
    var tmpFiles = List[java.io.File]()
    var buf = List[String]()
    var currentSize = 0

    // Read input and divide into blocks
    for (line <- io.Source.fromFile(inFile).getLines()) {
      if (currentSize > blockSize) {
        tmpFiles ::= writeSorted(buf)
        buf = List[String]()
        currentSize = 0
      }
      buf ::= line
      currentSize += line.length() * 2 // 2 bytes per char
    }
    if (currentSize > 0) tmpFiles ::= writeSorted(buf)

    tmpFiles
  }

  /** Merges results of sorted partitions into one output file */
  def mergeSortedFiles(fs: List[java.io.File])
      (implicit ord: Ordering[String]): Int = {

    /** Temp file buffer for reading lines */
    class TempFileBuffer(val file: java.io.File) {

      private val in = new java.io.BufferedReader(
        new java.io.FileReader(file), TempBufferSize)
      private var curLine: String = ""

      readNextLine() // prep first value

      def currentLine = curLine

      def isEmpty = curLine == null

      def readNextLine() {
        if (curLine == null) return

        try {
          curLine = in.readLine()
        } catch {
          case _: java.io.EOFException => curLine = null
        }

        if (curLine == null) in.close()
      }

      override protected def finalize() {
        try {
          in.close()
        } finally {
          super.finalize()
        }
      }
    }

    val wrappedOrd = new Ordering[TempFileBuffer] {
      def compare(o1: TempFileBuffer, o2: TempFileBuffer): Int = {
        ord.compare(o1.currentLine, o2.currentLine)
      }
    }

    val pq = new collection.mutable.PriorityQueue[TempFileBuffer](
      )(wrappedOrd)

    // Init queue with item from each file
    for (tmp <- fs) {
      val buf = new TempFileBuffer(tmp)
      if (!buf.isEmpty) pq += buf
    }

    var count = 0

    val out = new java.io.PrintWriter(new java.io.File(outFileName))
    try {
      // Read each value off of queue
      while (pq.size > 0) {
        val buf = pq.dequeue()
        out.println(buf.currentLine)
        count += 1
        buf.readNextLine()
        if (buf.isEmpty) {
          buf.file.delete() // don't need anymore
        } else {
          // re-add to priority queue so we can process next line
          pq += buf
        }
      }
    } finally {
      out.close()
    }

    count
  }

  mergeSortedFiles(partitionAndSort())
}

Ответы [ 2 ]

4 голосов
/ 16 октября 2011

Мои тесты не показывают никаких ошибок в PriorityQueue.

import org.scalacheck._
import Prop._

object PriorityQueueProperties extends Properties("PriorityQueue") {
  def listToPQ(l: List[String]): PriorityQueue[String] = { 
    val pq = new PriorityQueue[String]
    l foreach (pq +=)
    pq 
  }
  def pqToList(pq: PriorityQueue[String]): List[String] = 
    if (pq.isEmpty) Nil 
    else { val h = pq.dequeue; h :: pqToList(pq) }

  property("Enqueued elements are dequeued in reverse order") = 
    forAll { (l: List[String]) => l.sorted == pqToList(listToPQ(l)).reverse }

  property("Adding/removing elements doesn't break sorting") = 
    forAll { (l: List[String], s: String) => 
      (l.size > 0) ==> 
      ((s :: l.sorted.init).sorted == { 
        val pq = listToPQ(l)
        pq.dequeue
        pq += s
        pqToList(pq).reverse 
      })
    }
}

scala> PriorityQueueProperties.check
+ PriorityQueue.Enqueued elements are dequeued in reverse order: OK, passed
   100 tests.
+ PriorityQueue.Adding/removing elements doesn't break sorting: OK, passed 
  100 tests.

Если бы вы могли как-то уменьшить ввод, достаточный для создания тестового примера, это помогло бы.

1 голос
/ 18 октября 2011

Я запускал его с пятью миллионами входов несколько раз, выход всегда соответствовал ожидаемому.Судя по вашему коду, я полагаю, что ваш заказ - это проблема (т.е. он дает противоречивые ответы.)

...