Scala Dataframe / SQL: Dynami c Выбор столбца для отчетов - PullRequest
0 голосов
/ 15 марта 2020

Контекст :

У нас есть модель, которая имеет 5 потоков.

  • Поток-1: входные данные из разных источников, они вводятся в поток -2
  • Flow-2, Flow-3 и Flow-4: модели ML, каждая из которых сбрасывает несколько полей в s3
  • Flow-5: Уровень отчетности с данными из выходных данных Flow-2, Flow-3, Flow-4
  • Общий размер данных очень мал

Проблема :

  • Flow-5, который Уровень отчетности на основе нескольких SQL с входными данными, поступающими из Flow-2, Flow-3 и Flow-4.
  • Flow-2, Flow-3 и Flow-4 имеет одно общее поле объединения и остальные поля различны.

  • Мы можем создать SQL объединение данных Flow-2,3,4, хранящихся в трех разных таблицах с несколькими расчетами / агрегацией. Однако количество выходных данных поля из Flow-2,3,4 могут меняться при каждом запуске.

    • Проблема-1: каждый раз, когда изменяется структура файла s3 (Flow-2/3/4), что часто приводит к проблеме во время КОПИЯ, как цель Схема таблицы отличается от структуры файла s3 (чтобы исправить, необходимо вручную добавить / удалить поля в целевой таблице, выровнять по данным s3)
    • Проблема-2: для любых добавлений / удалений в файлах s3 необходимо сделать Изменения в отчетах путем добавления / удаления столбца

Подход :

  • SQL способ - стандартизировать s3 dump / target table / report SQL т.е. стандартизировать количество возможных столбцов в каждом потоке (2,3,4), а также в целевой таблице, так что если какие-либо поля недоступны, просто загрузите их как NULL / blank во время дампа s3 и COPY как пусто. Стандартизировать структуру целевой таблицы в соответствии с шаблоном s3. Также стандартизируйте отчетность SQL

  • SCALA / SPARK: В настоящее время изучается эта опция. чтобы выполнить Po C, создал два дампа s3, создал два фрейма данных в scala, пробовал соединения с фреймами данных, а также спарк SQL. Я до сих пор не уверен, есть ли в любом случае, мы можем динамически выбирать новые столбцы ie, делая искровой код обобщенным c.

    • с созданием фреймов данных, непосредственно указывающих на s3, мы можем решить Копировать данные (поля Dynami c) для решения проблемы с таблицей.

    • Однако проблема с отчетностью SQL все еще сохраняется (или, по крайней мере, я не знаю, нужно найти способ, как можно это будет обработано)

Вопрос :

Есть ли в любом случае, мы можем решить эту проблему (Dynami c выбор колонки в SQL) в Scala / Spark SQL? Примечание: файл s3 всегда содержит только обязательные поля для каждого прогона. Вопрос не в том, чтобы прочитать динамические c поля из s3 (которые будут автоматически обработаны с помощью фрейма данных, вместо этого вопрос в том, как мы можем сделать SQL (saprkSQL / API) код, чтобы справиться с этим)

Пример / Сценарий-1 :

  • Run-1

    • df1 = страна, 1st_buy (здесь датафрейм напрямую указывает на s3, который имеет только обязательные атрибуты)

    • df2 = страна, процент (здесь датафрейм напрямую указывает на s3, который имеет только обязательные атрибуты)

--Sample SQL code (need to convert to sparkSQL/dataframe API)
SELECT df1.1st_buy,
       df1.percent/df2.percent -- DERIVED FIELD
FROM df1,df2
WHERE df1.country=df2.country
  • Run-2 (здесь один дополнительный столбец был добавлено в df1)

    • df1 = страна, 1st_buy, 2nd_buy (здесь фрейм данных напрямую указывает на s3, который имеет только обязательные атрибуты)

    • df2 = страна, процент (здесь датафрейм напрямую указывает на s3, который имеет только обязательные атрибуты)

****how can we handle this part of new field 2nd_buy dynamically****

--Sample SQL code (need to convert to sparkSQL/dataframe API)
SELECT df1.1st_buy,
       df1.2nd_buy,
       df1.1st_buy/df2.percent -- DERIVED FIELD
       df1.2nd_buy/df2.percent -- DERIVED FIELD
FROM df1,df2
WHERE df1.country=df2.country

Экзамен ple / Scenario-2 :

  • Run-1

    • df1 = страна, 1st_buy (здесь датафрейм напрямую указывает на s3, который имеет только обязательные атрибуты)
--Sample SQL
SELECT country,sum(df1.1st_buy)
FROM df1
GROUP BY country

--Dataframe API/SparkSQL
df1.groupBy("country").sum("1st_buy").show()


  • Run-2 (здесь один дополнительный столбец был добавлен к df1 )

    • df1 = страна, 1st_buy, 2nd_buy (здесь фрейм данных напрямую указывает на s3, который имеет только обязательные атрибуты)
****how can we handle this part of new field 2nd_buy dynamically****

--Sample SQL
SELECT country,sum(df1.1st_buy),sum(2nd_buy)
FROM df1
GROUP BY country

--Dataframe API/SparkSQL
df1.groupBy("country").sum("1st_buy","2nd_buy").show() 

Ответы [ 2 ]

1 голос
/ 22 марта 2020

{Примеры:

package spark

import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions.{col, column, sum}

object DynamicColumnSelection extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  case class c1(
     country: String,
     st_buy: Double,
     nd_buy: Double
  )

  case class c2(
     country: String,
     percent: Double
  )

val df1 = Seq(
  c1("UA", 2, 4),
  c1("PL", 3, 6),
  c1("GB", 4, 8)
  )
  .toDF()

  df1.show(false)
//  +-------+------+------+
//  |country|st_buy|nd_buy|
//  +-------+------+------+
//  |UA     |2.0   |4.0   |
//  |PL     |3.0   |6.0   |
//  |GB     |4.0   |8.0   |
//  +-------+------+------+

  val df2 = Seq(
    c2("UA", 2.21),
    c2("PL", 3.26)
  )
    .toDF()
  df2.show(false)
//  +-------+-------+
//  |country|percent|
//  +-------+-------+
//  |UA     |2.21   |
//  |PL     |3.26   |
//  +-------+-------+


  // Inner Join
  val df = df1.join(df2, df1.col("country") === df2.col("country"), "inner")
      .select(
        df1.col("country"),
        df1.col("st_buy"),
        df1.col("nd_buy"),
        df2.col("percent")
      )
  df.show(false)
//  +-------+------+------+-------+
//  |country|st_buy|nd_buy|percent|
//  +-------+------+------+-------+
//  |UA     |2.0   |4.0   |2.21   |
//  |PL     |3.0   |6.0   |3.26   |
//  +-------+------+------+-------+


  val res1DF = df.withColumn("st_buy_percent", 'st_buy/'percent)
    .withColumn("nd_buy_percent", 'nd_buy/'percent)

  res1DF.show(false)
//  +-------+------+------+-------+------------------+------------------+
//  |country|st_buy|nd_buy|percent|st_buy_percent    |nd_buy_percent    |
//  +-------+------+------+-------+------------------+------------------+
//  |UA     |2.0   |4.0   |2.21   |0.9049773755656109|1.8099547511312217|
//  |PL     |3.0   |6.0   |3.26   |0.9202453987730062|1.8404907975460123|
//  +-------+------+------+-------+------------------+------------------+


  // GroupBy + sum
  val data = Seq(
    c1("UA", 2, 4),
    c1("PL", 3, 6),
    c1("UA", 5, 10),
    c1("PL", 6, 12),
    c1("GB", 4, 8)
  )
    .toDF()

  val resGroupByDF = data
    .groupBy("country")
    .agg(sum("st_buy").alias("sum_st_buy")
    ,sum("nd_buy").alias("sum_nd_buy"))

  resGroupByDF.show(false)
//  +-------+----------+----------+
//  |country|sum_st_buy|sum_nd_buy|
//  +-------+----------+----------+
//  |UA     |7.0       |14.0      |
//  |PL     |9.0       |18.0      |
//  |GB     |4.0       |8.0       |
//  +-------+----------+----------+


  val resGroupByDF1 = data.groupBy($"country").sum()
  resGroupByDF1.show(false)
//  +-------+-----------+-----------+
//  |country|sum(st_buy)|sum(nd_buy)|
//  +-------+-----------+-----------+
//  |UA     |7.0        |14.0       |
//  |PL     |9.0        |18.0       |
//  |GB     |4.0        |8.0        |
//  +-------+-----------+-----------+


  val exprs = data.columns.map(sum(_))
  val resGroupByDF2 = data.groupBy($"country").agg(exprs.head, exprs.tail: _*)
  resGroupByDF2.show(false)
//  +-------+------------+-----------+-----------+
//  |country|sum(country)|sum(st_buy)|sum(nd_buy)|
//  +-------+------------+-----------+-----------+
//  |UA     |null        |7.0        |14.0       |
//  |PL     |null        |9.0        |18.0       |
//  |GB     |null        |4.0        |8.0        |
//  +-------+------------+-----------+-----------+

  val exprs3 = List("st_buy", "nd_buy").map(sum(_))
  val resGroupByDF3 = data.groupBy($"country").agg(exprs3.head, exprs3.tail: _*)
  resGroupByDF3.show(false)
//  +-------+-----------+-----------+
//  |country|sum(st_buy)|sum(nd_buy)|
//  +-------+-----------+-----------+
//  |UA     |7.0        |14.0       |
//  |PL     |9.0        |18.0       |
//  |GB     |4.0        |8.0        |
//  +-------+-----------+-----------+


  val exprs4 = data.columns.toList.filter(_ != "country").map(sum(_))
  val resGroupByDF4 = data.groupBy($"country").agg(exprs4.head, exprs4.tail: _*)
  resGroupByDF4.show(false)

//  +-------+-----------+-----------+
//  |country|sum(st_buy)|sum(nd_buy)|
//  +-------+-----------+-----------+
//  |UA     |7.0        |14.0       |
//  |PL     |9.0        |18.0       |
//  |GB     |4.0        |8.0        |
//  +-------+-----------+-----------+

  // Select
  val cols = data.columns.toSeq
  val selectDF1 = data.select(cols.head, cols.tail:_*)
  selectDF1.show(false)
//  +-------+------+------+
//  |country|st_buy|nd_buy|
//  +-------+------+------+
//  |UA     |2.0   |4.0   |
//  |PL     |3.0   |6.0   |
//  |UA     |5.0   |10.0  |
//  |PL     |6.0   |12.0  |
//  |GB     |4.0   |8.0   |
//  +-------+------+------+
}

}

1 голос
/ 16 марта 2020
{
  1.
  val sqlScript = "select col1, col2, .... from ... "
  // string we can create dynamic
  val df = spark.sql(sqlScript)
 2. try use schema =  = StructType(Seq(
  StructField("id",LongType,true),
  ....
  )) 
  // and then use schema.fieldsName... or
  val cols: List[Columns] = ...
  // in df.select(cols:_*)
 3. get schema (list fields with json file)
     package spark

import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DataType, StructType}

import scala.io.Source

object DFFieldsWithJson extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  case class TestData (
    id:         Int,
    firstName:  String,
    lastName:   String,
    descr:      String
  )

  val dataTestDF = Seq(
    TestData(1, "First Name 1", "Last Name 1", "Description 1"),
    TestData(2, "First Name 2", "Last Name 2", "Description 2"),
    TestData(3, "First Name 3", "Last Name 3", "Description 3")
  ).toDF()

  dataTestDF.show(false)
//  +---+------------+-----------+-------------+
//  |id |firstName   |lastName   |descr        |
//  +---+------------+-----------+-------------+
//  |1  |First Name 1|Last Name 1|Description 1|
//  |2  |First Name 2|Last Name 2|Description 2|
//  |3  |First Name 3|Last Name 3|Description 3|
//  +---+------------+-----------+-------------+

  val schemaJson =
    """{ "type" : "struct",
      |"fields" : [
      |{
      |    "name" : "id",
      |    "type" : "integer",
      |    "nullable" : true,
      |    "metadata" : { }
      |  },
      |  {
      |    "name" : "firstName",
      |    "type" : "string",
      |    "nullable" : true,
      |    "metadata" : {}
      |  },
      |  {
      |    "name" : "lastName",
      |    "type" : "string",
      |    "nullable" : true,
      |    "metadata" : {}
      |  }
      |  ]}""".stripMargin

  val schemaSource = schemaJson.mkString
  val schemaFromJson =   DataType.fromJson(schemaSource).asInstanceOf[StructType]

  println(schemaFromJson)
//  StructType(StructField(id,IntegerType,true), StructField(firstName,StringType,true), StructField(lastName,StringType,true))


  val cols: List[String] = schemaFromJson.fieldNames.toList
  val col: List[Column] = cols.map(dataTestDF(_))
  val df = dataTestDF.select(col: _*)


  df.printSchema()

//  root
//  |-- id: integer (nullable = false)
//  |-- firstName: string (nullable = true)
//  |-- lastName: string (nullable = true)

  df.show(false)
//  +---+------------+-----------+
//  |id |firstName   |lastName   |
//  +---+------------+-----------+
//  |1  |First Name 1|Last Name 1|
//  |2  |First Name 2|Last Name 2|
//  |3  |First Name 3|Last Name 3|
//  +---+------------+-----------+
}



}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...