Читайте CSV с более чем 22 столбцами в Apache Flink - PullRequest
0 голосов
/ 29 августа 2018

То, что я делал до сих пор, читается как CSV следующим образом:

val data = env.readCsvFile[ElecNormNew](getClass.getResource("/elecNormNew.arff").getPath)

val dataSet = data map { tuple =>
      val list = tuple.productIterator.toList
      val numList = list map (_.asInstanceOf[Double])
      LabeledVector(numList(8), DenseVector(numList.take(8).toArray))
    }

Где ElecNorNew является case class:

case class ElecNormNew(
  var date: Double,
  var day: Double,
  var period: Double,
  var nswprice: Double,
  var nswdemand: Double,
  var vicprice: Double,
  var vicdemand: Double,
  var transfer: Double,
  var label: Double) extends Serializable {
}

Как указано в Документы Флинка . Но сейчас я пытаюсь прочитать CSV с 53 столбцами. Есть ли способ автоматизировать этот процесс? Нужно ли создавать POJO с 53 полями?

Обновление

После ответа Фабиана я пытаюсь это сделать:

val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
  val rowIF = new RowCsvInputFormat(new Path(getClass.getResource("/lungcancer.csv").getPath), fieldTypes)
  val csvData: DataSet[Row] = env.createInput[Row](rowIF)
  val dataSet2 = csvData.map { tuple =>
      ???
  }

Но не знаете, как продолжить, как я должен использовать RowTypeInfo?

1 Ответ

0 голосов
/ 30 августа 2018

Вы можете использовать RowCsvInputFormat следующим образом:

val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)

val rowIF = new RowCsvInputFormat(new Path("file:///myCsv"), fieldTypes)
val csvData: DataSet[Row] = env.createInput[Row](rowIF)

Row сохраняет данные в Array[Any]. Поэтому Flink не может автоматически определять типы полей для Row. Это немного сложнее в использовании, чем типизированные кортежи или case-классы. Вам необходимо явно предоставить RowTypeInfo правильные типы. Это можно сделать как неявные значения или с помощью функций, расширяющих интерфейс ResultTypeQueryable.

...