Чтение большого файла HDFS в Flink DataSet - PullRequest
0 голосов
/ 28 октября 2018

Я пытаюсь прочитать большой файл с пользовательским форматом .fvecs из HDFS и в DataSet. Данные общедоступны здесь , что также подробно объясняет формат. Я дам краткое резюме:

Файлы содержат набор векторов, хранящихся в сыром порядке байтов. Каждый вектор занимает 4 + d * 4 байтов, где первые четыре байта каждого вектора являются числами с плавающей точкой, представляющими размерность вектора, которая, как известно, равна d = 128. Следующие d * 4 байты определяют значения в d -мерной точке.

Данные могут содержать свыше одного миллиарда точек, которые никогда не смогут поместиться в памяти, поскольку для этого потребуется как минимум 500 ГБ памяти, даже без дополнительных накладных расходов. Файлы могут быть прочитаны с помощью следующей функции, если размер ввода не слишком велик:

// The Point class is just a wrapper for Vector[Float]
def fi_vecs(path: String): Vector[Point] = {

  val data_in = new DataInputStream(
    new BufferedInputStream(
      new FileInputStream(
        new File(path))))

  // 516 is the number of bytes in a Point.
  // For floating points, we have 4 + 4 * 128
  val tmpArray = ByteBuffer.allocate(516).array
  val buffer = ByteBuffer.wrap(tmpArray)
  buffer.order(ByteOrder.LITTLE_ENDIAN)

  var tempVec = Vector[Point]()
  while (data_in.available > 0) {
    data_in.readFully(tmpArray)
    buffer.rewind()

    val dim = buffer.getInt
    if (dim != 128) throw new IOException("Error: Unexpected dimensionality d = " + dim + " of a feature vector.")

    var vec = Vector[Float]()
    while (vec.size < dim) {
        vec = vec :+ buffer.getFloat
    }
    tempVec = tempVec :+ Point(vec)
  }

  data_in.close()
  tempVec
}

Я пытался внедрить пользовательский считыватель FileInputFormat, но я не уверен, как управлять вызовом функции nextRecord без сохранения полностью прочитанного Vector[Point] непосредственно в памяти.

Любые предложения приветствуются.

...