Как преобразовать любой текстовый файл с разделителями в паркет / avro - динамически изменяя номер столбца / структуру в avro / parquet с помощью spark sql? - PullRequest
0 голосов
/ 02 октября 2019

Нам нужно ежедневно преобразовывать текстовые данные в паркет / авро, когда входные данные поступают из нескольких источников, и имеют разную структуру, для которой мы хотели бы использовать scala-код на основе spark sql, независимо от разделителя и количества столбцов или структуры.

Ответы [ 2 ]

1 голос
/ 04 октября 2019

После анализа вашей проблемы я делаю следующие предположения:

1. data source can be anything, primarily HDFS
2. delimiter can be anything
3. you're maintaining  structure for each source. 
4. file does not contains header

Предложение: здесь проблема в том, что вы должны сгенерировать StructType, если ваши данные не содержат заголовок. Придумайте некоторую структуру, которая может быть структурой json для определения вашего источника данных. Затем загрузите и проанализируйте JSON, используя Jackson, используя Scala. Или просто передайте column_map в вашу программу.

Example: 
{
    "inputLocation": "",
    "delimiter" : ",",
    "column_map" : "col1, datatype; col12, datatype;col1, datatype; col12, datatype"
    "outputLocation": ""
}

Теперь используйте column_map для динамического создания типа структуры.

object GenerateStructType {

  import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}

  def generateStrucType(columnsList: Seq[String]): StructType = {

    val res=columnsList.map( columnDetail => {
      val  columnName = columnDetail.split(",")(0).trim
      val  columnType = columnDetail.split(",")(1).trim

      columnType match {
        case "String" => StructField(columnName,StringType,true)
        case "Bool" => StructField(columnName,BooleanType,true)
        case _ => StructField(columnName,StringType,true)

      }
    })
    StructType(res)
  }

  def main(args: Array[String]): Unit = {
    val columnMap=  "col1, datatype; col12, datatype;col1, datatype; col12, datatype"

    val result= GenerateStructType.generateStrucType(    columnMap.split(";"))
    println(result)
  }

}

динамически сгенерированного StructType :

StructType(StructField(col1,StringType,true), StructField(col12,StringType,true), StructField(col1,StringType,true), StructField(col12,StringType,true))

используйтеТип структуры при загрузке данных.

Надеюсь, это поможет ....

0 голосов
/ 05 октября 2019

Я написал этот код в spark 2.1.0 - Spark SQL

Используемый ввод

1238769|Michael|Hoffman|50000|New York
1238769|Michael1|Hoffman1|50000|New York1
1238770|Michael2|Hoffman2|50000|New York2
1238771|Michael3|Hoffman3|50000|New York3
1238772|Michael4|Hoffman4|50000|New York4
1238773|Michael5|Hoffman5|50000|New York5
1238774|Michael6|Hoffman6|50000|New York6
1238775|Michael7|Hoffman7|50000|New York7
1238776|Michael8|Hoffman8|50000|New York8
1238777|Michael9|Hoffman9|50000|New York9

В этом примере я собираюсь преобразовать текстовый файл трубы ("|")в паркет

Шаг # 1: чтение входных переменных

//creating spark session
val spark = SparkSession.builder().appName("Text to Parquet").master("local[*]").getOrCreate()
import spark.implicits._

//Assigning values to the variables
val input_location = args(0).trim.toString()
val delimiter = "\\|" //You can make it dynamic by passing it as an argument
val selectColString_location = args(1).trim().toString()
val output_location = args(2).trim().toString()

Шаг # 2: чтение входных текстовых данных и разбиение по разделителю

//Reading data from text file
val input_rdd = spark.sparkContext.textFile(input_location)

//Split the input data using the delimiter(we are suing pipe(\\|) as delimiter for this example)
val input_array_rdd:RDD[Array[String]] = input_rdd.map(x => x.split(delimiter, -1))

Шаг# 3: Преобразование rdd, созданного на шаге 2, в фрейм данных с использованием toDF только с одним столбцом - col, который будет столбцом массива

//Converting input_array_rdd into dataframe with only one column - col
val input_df:DataFrame = input_array_rdd.toDF("col")

//Creating temp table on top of input_df with the name TABLE1
input_df.createOrReplaceTempView("TABLE1")

Шаг # 4: Подготовка оператора select согласно структуре вводаиспользуя временную таблицу - TABLE1 и столбец массива - col и сохраняя ее в текстовом файле в виде одной строки

select cast(col[0] as bigint) as cust_id, col[1] as first_name, col[2] as last_name, cast(col[3] as decimal(18,6)) as amount, col[4] as city from table1

Шаг # 5: чтение оператора select из файла и выполнение его для генерации вывода

//Reading the selectColString, remember we are reading only the first row from the file
//Select SQL should be only one row in the selectColString.txt file
val sqlColString = spark.sparkContext.textFile(selectColString_location).first().toString()
//Generating the output using the colString
val output_df = spark.sql(sqlColString)

Шаг № 6: Запись вывода в виде паркета

output_df.write.mode(SaveMode.Overwrite).parquet(output_location)

Схема вывода паркета

root
 |-- cust_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- amount: decimal(18,6) (nullable = true)
 |-- city: string (nullable = true)

С помощью этой единственной программы мы можем преобразовать весь наш текстфайлы для паркета, просто изменив файл selectColString как per введенный текст.

Ссылка на код Github: https://github.com/sangamgavini/ReusableCodes/tree/master/src/main/scala/com/sangam/TexttoParquet

...