Как читать файлы фиксированной ширины, используя Spark в R - PullRequest
0 голосов
/ 25 марта 2019

Мне нужно прочитать файл с фиксированной шириной 10 ГБ на фрейм данных.Как я могу сделать это, используя Spark в R?

Предположим, мои текстовые данные следующие:

text <- c("0001BRAjonh   ",
"0002USAmarina ",
"0003GBPcharles")

Я хочу, чтобы 4 первых символа были связаны со столбцом "ID"фрейм данных;из символов 5-7 будет связан столбец «Страна»;и из символа 8-14, который будет связан со столбцом «Имя»

Я бы использовал функцию read.fwf, если набор данных был маленьким, но это не так.

Я могу прочитать файл как текстовый файл, используя функцию sparklyr :: spark_read_text.Но я не знаю, как правильно приписать значения файла фрейму данных.

1 Ответ

0 голосов
/ 27 марта 2019

РЕДАКТИРОВАТЬ: Забыл сказать, что подстрока начинается с 1, а массив начинается с 0, по причинам.

Проходя и добавляя код, о котором я говорил в столбце выше.

Процессдинамический и основан на таблице Hive с именем Input_Table.Таблица имеет 5 столбцов: Table_Name, Column_Name, Column_Ordinal_Position, Column_Start и Column_Length.Он является внешним, поэтому любой пользователь может изменять, удалять и удалять любые файлы в папке.Я быстро построил это с нуля, чтобы не брать реальный код, все имеет смысл?

#Call Input DataFrame and the Hive Table. For hive table we make sure to only take correct column as well as the columns in correct order.
val inputDF       = spark.read.format(recordFormat).option("header","false").load(folderLocation + "/" + tableName + "." + tableFormat).rdd.toDF("Odd_Long_Name")
val inputSchemaDF = spark.sql("select * from Input_Table where Table_Name = '" + tableName + "'").sort($"Column_Ordinal_Position")

#Build all the arrays from the columns, rdd to map to collect changes a dataframe col to a array of strings. In this format I can iterator through the column.
val columnNameArray    = inputSchemaDF.selectExpr("Column_Name").rdd.map(x=>x.mkString).collect
val columnStartArray   = inputSchemaDF.selectExpr("Column_Start_Position").rdd.map(x=>x.mkString).collect
val columnLengthArray  = inputSchemaDF.selectExpr("Column_Length").rdd.map(x=>x.mkString).collect

#Make the iteraros as well as other variables that are meant to be overwritten
var columnAllocationIterator = 1
var localCommand             = ""
var commandArray             = Array("") 

#Loop as there are as many columns in input table
while (columnAllocationIterator <= columnNameArray.length) {
  #overwrite the string command with the new command, thought odd long name was too accurate to not place into the code
  localCommand = "substring(Odd_Long_Name, " + columnStartArray(columnAllocationIterator-1) + ", " + columnLengthArray(columnAllocationIterator-1) + ") as " + columnNameArray(columnAllocationIterator-1) 

  #If the code is running the first time it overwrites the command array, else it just appends
  if (columnAllocationIterator==1) {
    commandArray = Array(localCommand)
  } else {
    commandArray = commandArray ++ Array(localCommand)
  }

  #I really like iterating my iterators like this
  columnAllocationIterator = columnAllocationIterator + 1
}

#Run all elements of the string array indepently against the table
val finalDF = inputDF.selectExpr(commandArray:_*)
...