Написание собственного класса для HDFS в Apache Flink - PullRequest
0 голосов
/ 12 ноября 2018

Я пытаюсь познакомиться с семантикой Flink после того, как начал с Spark. Я хотел бы записать DataSet[IndexNode] в постоянное хранилище в HDFS, чтобы позже его мог прочитать другой процесс. Spark имеет простой ObjectFile API, который предоставляет такую ​​функциональность, но я не могу найти подобную опцию во Flink.

case class IndexNode(vec: Vector[IndexNode],
                     id: Int) extends Serializable {
  // Getters and setters etc. here
}

Встроенные приемники стремятся сериализовать экземпляр на основе метода toString, который здесь не подходит из-за вложенной структуры класса. Я полагаю, что решение состоит в том, чтобы использовать FileOutputFormat и обеспечить перевод экземпляров в поток байтов. Однако я не уверен, как сериализовать вектор, который имеет произвольную длину и может иметь многоуровневую глубину.

1 Ответ

0 голосов
/ 14 ноября 2018

Вы можете достичь этого, используя SerializedOutputFormat и SerializedInputFormat.

Попробуйте выполнить следующие действия:

  1. Сделать IndexNode расширить IOReadableWritable интерфейс от FLINK. Сделайте несериализуемые поля @transient. Реализуйте метод write(DataOutputView out) и read(DataInputView in). Метод write запишет все данные из IndexNode, а метод read прочитает их обратно и построит все внутренние поля данных. Например, вместо сериализации всех данных из поля arr в классе Result я выписываю все значения, а затем считываю их обратно и перестраиваю массив в методе чтения.

    class Result(var name: String, var count: Int) extends IOReadableWritable {
    
      @transient
      var arr = Array(count, count)
    
      def this() {
        this("", 1)
      }
    
      override def write(out: DataOutputView): Unit = {
        out.writeInt(count)
        out.writeUTF(name)
      }
    
      override def read(in: DataInputView): Unit = {
        count = in.readInt()
    
        name = in.readUTF()
    
        arr = Array(count, count)
      }
    
      override def toString: String = s"$name, $count, ${getArr}"
    
    }
    
  2. Запишите данные с помощью

    myDataSet.write(new SerializedOutputFormat[Result], "/tmp/test")
    

    и прочитайте его обратно

    env.readFile(new SerializedInputFormat[Result], "/tmp/test")
    
...