Как фильтровать строки, имеющие массив структур? - PullRequest
0 голосов
/ 18 июня 2020

У меня есть dataFrame с массивом структуры, поэтому я просто хочу отфильтровать столбцы, или мы можем сказать, выберите столбец в массиве структуры из массива структуры, но возможно ли это, поскольку я повторяю строку. Схема

 root 
     |-- day: long (nullable = true)
     |-- table_row: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |-- DATE: string (nullable = true)
     |    |-- ADMISSION_NUM: string (nullable = true)
     |    |-- SOURCE_CODE: string (nullable = true)

Я делаю итерацию по строкам. Можно ли выбрать столбцы массива по строкам. Я только хочу знать, как это возможно.

def keepColumnInarray(columns: Set[String], row: Row): Row = {
      //Some 
    }

Пример. Если я хочу сохранить столбец «Данные», тогда keepColumnInarray выберет только эту

Схема вывода

 root 
     |-- day: long (nullable = true)
     |-- table_row: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |-- DATE: string (nullable = true)


1 Ответ

0 голосов
/ 18 июня 2020

spark> = 2.4

df.withColumn("table_row", expr("TRANSFORM (table_row, x -> named_struct('DATE', x.DATE))")

это преобразует

table_row: array
struct (nullable = true)
     |    |-- DATE: string (nullable = true)
     |    |-- ADMISSION_NUM: string (nullable = true)
     |    |-- SOURCE_CODE: string (nullable = true)

в

table_row: array
 struct (nullable = true)
     |    |-- DATE: string (nullable = true)

Update-1 (на основе комментариев)

spark <2,4 </h2> Используйте UDF ниже для выбора столбцов - val df = spark.range(2).withColumnRenamed("id", "day") .withColumn("table_row", expr("array(named_struct('DATE', 'sample_date'," + " 'ADMISSION_NUM', 'sample_adm_num', 'SOURCE_CODE', 'sample_source_code'))")) df.show(false) df.printSchema() // // +---+---------------------------------------------------+ // |day|table_row | // +---+---------------------------------------------------+ // |0 |[[sample_date, sample_adm_num, sample_source_code]]| // |1 |[[sample_date, sample_adm_num, sample_source_code]]| // +---+---------------------------------------------------+ // // root // |-- day: long (nullable = false) // |-- table_row: array (nullable = false) // | |-- element: struct (containsNull = false) // | | |-- DATE: string (nullable = false) // | | |-- ADMISSION_NUM: string (nullable = false) // | | |-- SOURCE_CODE: string (nullable = false) // def keepColumnInarray(columnsToKeep: Seq[String], rows: mutable.WrappedArray[Row]) = { rows.map(r => { new GenericRowWithSchema(r.getValuesMap(columnsToKeep).values.toArray, StructType(r.schema.filter(s => columnsToKeep.contains(s.name)))) }) } val keepColumns = udf((columnsToKeep: Seq[String], rows: mutable.WrappedArray[Row]) => keepColumnInarray(columnsToKeep, rows) , ArrayType(StructType(StructField("DATE", StringType) :: Nil))) val processedDF = df .withColumn("table_row_new", keepColumns(array(lit("DATE")), col("table_row"))) processedDF.show(false) processedDF.printSchema() // // +---+---------------------------------------------------+---------------+ // |day|table_row |table_row_new | // +---+---------------------------------------------------+---------------+ // |0 |[[sample_date, sample_adm_num, sample_source_code]]|[[sample_date]]| // |1 |[[sample_date, sample_adm_num, sample_source_code]]|[[sample_date]]| // +---+---------------------------------------------------+---------------+ // // root // |-- day: long (nullable = false) // |-- table_row: array (nullable = false) // | |-- element: struct (containsNull = false) // | | |-- DATE: string (nullable = false) // | | |-- ADMISSION_NUM: string (nullable = false) // | | |-- SOURCE_CODE: string (nullable = false) // |-- table_row_new: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- DATE: string (nullable = true) //

...