Как создать фрейм данных в цикле for с переменной, которая повторяется в цикле - PullRequest
4 голосов
/ 20 июня 2019

Итак, у меня есть огромный фрейм данных, представляющий собой комбинацию отдельных таблиц, в конце которого есть столбец идентификатора, в котором указан номер таблицы, как показано ниже

+----------------------------+
| col1 col2 .... table_num   |
+----------------------------+
| x     y            1       |
| a     b            1       |
| .     .            .       |
| .     .            .       |
| q     p            2       |
+----------------------------+

(исходная таблица)

Я должен разделить это на несколько маленьких фреймов данных на основе номера таблицы.Число таблиц, объединенных для создания этого, довольно велико, так что не представляется возможным индивидуально создавать несвязанные подмножества данных, поэтому я подумал, что если бы я сделал цикл for, повторяющийся от минимального до максимального значения table_num, я мог бы выполнить эту задачу, но я не могу 'Похоже, что любая помощь приветствуется.

Это то, что я придумал

for (x < min(table_num) to max(table_num)) {

var df(x)= spark.sql("select * from df1 where state = x")
df(x).collect()

, но я не думаю, что декларация верна.

по сути, мне нужны df, которые выглядят так:

+-----------------------------+
| col1  col2  ...   table_num |
+-----------------------------+
| x      y             1      |
| a      b             1      |
+-----------------------------+


+------------------------------+
| col1   col2  ...   table_num |
+------------------------------+
| xx      xy             2     |
| aa      bb             2     |
+------------------------------+

+-------------------------------+
| col1    col2  ...   table_num |
+-------------------------------+
| xxy      yyy             3    |
| aaa      bbb             3    |
+-------------------------------+

... и так далее ...

(как бы мне хотелосьДатафреймы разделены)

Ответы [ 3 ]

0 голосов
/ 20 июня 2019

Подход состоит в том, чтобы собрать все уникальные ключи и построить соответствующие фреймы данных. Я добавил немного функционального аромата.

Образец набора данных:

  name,year,country,id
  Bayern Munich,2014,Germany,7747
  Bayern Munich,2014,Germany,7747
  Bayern Munich,2014,Germany,7746
  Borussia Dortmund,2014,Germany,7746
  Borussia Mönchengladbach,2014,Germany,7746
  Schalke 04,2014,Germany,7746
  Schalke 04,2014,Germany,7753
  Lazio,2014,Germany,7753

Код:

  val df = spark.read.format(source = "csv")
    .option("header", true)
    .option("delimiter", ",")
    .option("inferSchema", true)
    .load("groupby.dat")

  import spark.implicits._

  //collect data for each key into a data frame
  val uniqueIds = df.select("id").distinct().map(x => x.mkString.toInt).collect()
  // List buffer to hold separate data frames
  var dataframeList: ListBuffer[org.apache.spark.sql.DataFrame] = ListBuffer()
  println(uniqueIds.toList)

  // filter data
  uniqueIds.foreach(x => {
    val tempDF = df.filter(col("id") === x)
    dataframeList += tempDF
  })

  //show individual data frames
  for (tempDF1 <- dataframeList) {
    tempDF1.show()
  }

0 голосов
/ 20 июня 2019

Один из подходов заключается в write DataFrame как разделенном файлах паркета и read их обратно в Map, как показано ниже:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  ("a", "b", 1), ("c", "d", 1), ("e", "f", 1), 
  ("g", "h", 2), ("i", "j", 2)
).toDF("c1", "c2", "table_num")

val filePath = "/path/to/parquet/files"

df.write.partitionBy("table_num").parquet(filePath)

val tableNumList = df.select("table_num").distinct.map(_.getAs[Int](0)).collect
// tableNumList: Array[Int] = Array(1, 2)

val dfMap = ( for { n <- tableNumList } yield
    (n, spark.read.parquet(s"$filePath/table_num=$n").withColumn("table_num", lit(n)))
  ).toMap

Для доступаотдельные кадры данных из Map:

dfMap(1).show
// +---+---+---------+
// | c1| c2|table_num|
// +---+---+---------+
// |  a|  b|        1|
// |  c|  d|        1|
// |  e|  f|        1|
// +---+---+---------+

dfMap(2).show
// +---+---+---------+
// | c1| c2|table_num|
// +---+---+---------+
// |  g|  h|        2|
// |  i|  j|        2|
// +---+---+---------+
0 голосов
/ 20 июня 2019

В Spark Arrays могут быть почти в типе данных.Когда вы сделаны как vars, вы можете динамически добавлять и удалять элементы из них.Ниже я собираюсь выделить номера таблиц в их собственный массив, чтобы я мог легко их перебирать.После выделения я иду через цикл while, чтобы добавить каждую таблицу в качестве уникального элемента в массив DF Holder.Для запроса элементов массива используйте DFHolderArray (n-1), где n - это позиция, которую вы хотите запросить, а 0 - первый элемент.

//This will go and turn the distinct row nums in a queriable (this is 100% a word) array
val tableIDArray = inputDF.selectExpr("table_num").distinct.rdd.map(x=>x.mkString.toInt).collect

//Build the iterator
var iterator = 1  

//holders for DF and transformation step
var tempDF = spark.sql("select 'foo' as bar")
var interimDF = tempDF

//This will be an array for dataframes
var DFHolderArray : Array[org.apache.spark.sql.DataFrame] = Array(tempDF) 

//loop while the you have note reached end of array
while(iterator<=tableIDArray.length) {
  //Call the table that is stored in that location of the array
  tempDF = spark.sql("select * from df1 where state = '" + tableIDArray(iterator-1) + "'")
  //Fluff
  interimDF = tempDF.withColumn("User_Name", lit("Stack_Overflow"))
  //If logic to overwrite or append the DF
  DFHolderArray = if (iterator==1) {
    Array(interimDF)
  } else {
    DFHolderArray ++ Array(interimDF)
  }
  iterator = iterator + 1
}

//To query the data
DFHolderArray(0).show(10,false)
DFHolderArray(1).show(10,false)
DFHolderArray(2).show(10,false)
//....
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...