Как добавить индекс, сгруппированный по идентификатору, в Dataframe Spark - PullRequest
2 голосов
/ 25 июня 2019

У меня есть этот Dataframe

+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|_id      |details__line_items                                                                                                                                                  |searchable_tags|
+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|307131663|[[line_item_1345678912345678M, {}, {},, loan, 1,, 116000], [line_item_123456789, {}, {},, Test, 1,, 1234567], [line_item_2kZgNnPXvEgnKCAaM, {}, {},, loan, 1,, 1234]]|[]             |
|040013496|[[line_item_1345678912345678M, {}, {},, loan, 1,, 116000], [line_item_123456789, {}, {},, Test, 1,, 1234567], [line_item_2kZgNnPXvEgnKCAaM, {}, {},, loan, 1,, 1234]]|[]             |
+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+

Я взрываю столбец details__line_items с помощью этой функции:

 def getArrayDataFrame(df: DataFrame): ListBuffer[DataFrame] = {
    df.schema
      .filter(field => {
        field.dataType.typeName == "array"
      })
      .map(field => {
        val explodeColumn = (colsName: String) =>
          df.withColumn("items", explode(df.col(s"${field.name}")))
            .select("_id", colsName)
        field.dataType match {
          case arrayType: ArrayType => {
            arrayType.elementType.typeName match {
              case "struct" => explodeColumn("items.*")
              case _        => explodeColumn(s"${field.name}")
            }
          }
        }
      })
      .to[ListBuffer]
  }

Я получаю этот Dataframe:

+---------+---------------------------+--------------+---------------+-----------+----+--------+----+----------+
|_id      |_id                        |antifraud_info|contextual_data|description|name|quantity|sku |unit_price|
+---------+---------------------------+--------------+---------------+-----------+----+--------+----+----------+
|307131663|line_item_1345678912345678M|{}            |{}             |null       |loan|1       |null|116000    |
|307131663|line_item_123456789        |{}            |{}             |null       |Test|1       |null|1234567   |
|307131663|line_item_2kZgNnPXvEgnKCAaM|{}            |{}             |null       |loan|1       |null|1234      |
|040013496|line_item_1345678912345678M|{}            |{}             |null       |loan|1       |null|116000    |
|040013496|line_item_123456789        |{}            |{}             |null       |Test|1       |null|1234567   |
|040013496|line_item_2kZgNnPXvEgnKCAaM|{}            |{}             |null       |loan|1       |null|1234      |
+---------+---------------------------+--------------+---------------+-----------+----+--------+----+----------+

Как я могу получить новый Dataframe, подобный этому?

+---------+---+---------------------------+-------------------+--------------------+----------------+---------+-------------+--------+---------------+
|_id      |index|_id                  |antifraud_info|contextual_data|description|name|quantity|sku|unit_price|
+---------+---+---------------------------+-------------------+--------------------+----------------+---------+-------------+--------+---------------+
|307131663|0  |line_item_1345678912345678M|{}                 |{}                  |null            |loan     |1            |null    |116000         |
|307131663|1  |line_item_123456789        |{}                 |{}                  |null            |Test     |1            |null    |1234567        |
|307131663|2  |line_item_2kZgNnPXvEgnKCAaM|{}                 |{}                  |null            |loan     |1            |null    |1234           |
|040013496|0  |line_item_1345678912345678M|{}                 |{}                  |null            |loan     |1            |null    |116000         |
|040013496|1  |line_item_123456789        |{}                 |{}                  |null            |Test     |1            |null    |1234567        |
|040013496|2  |line_item_2kZgNnPXvEgnKCAaM|{}                 |{}                  |null            |loan     |1            |null    |1234           |
+---------+---+---------------------------+-------------------+--------------------+----------------+---------+-------------+--------+---------------+

Я уже пытался использовать posexplode, но он меняет мою схему данных, добавляя столбцы col и pos, я изменил свою функцию следующим образом.

def getArrayDataFrame(df: DataFrame): ListBuffer[DataFrame] = {
    df.schema
      .filter(field => {
        field.dataType.typeName == "array"
      })
      .map{ (field) => {

        println(s"This is the name of the field ${field.name}")

        val testDF =  df.select($"_id", posexplode(df.col(s"${field.name}") ))

        testDF.printSchema()
        val newDF = testDF.select(flattenSchema(testDF.schema): _*)
        newDF.printSchema()
        newDF
      }}
      .to[ListBuffer]
  }

Итак, как я могу получить индекс разнесенного столбца, не изменяя мою схему Dataframe?

1 Ответ

1 голос
/ 26 июня 2019

Чтобы добавить столбец индекса для каждой группы, используйте оконную функцию partitionBy () и функцию row_number из функций spark sql.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

df.withColumn("index", row_number(Window.partitionBy("col_to_groupBy").orderBy("some_col")))

Функция row_number () будет маркировать каждую строку с возрастающим индексом, чтобы сделать это для каждой группы, которую мы используем Оконная функция partitioBy () для группировки DF на основе некоторого столбца ( col_to_groupBy). Каждая группа должна быть упорядочена сама по себе - поэтому мы используем orderBy для упорядочения по некоторому столбцу (some_col). В вашем примере порядок не имеет значения, поэтому вы можете выбрать любой столбец, который вы хотите.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...