Spark DataFrame / DataSet разбивает на страницы или выполняет итерацию фрагмента N строки одновременно - PullRequest
0 голосов
/ 02 октября 2018

Мне нужно реализовать разбиение на страницы для моего набора данных (в искровых скалах).

Если в наборе искровых данных 100 записей, то мне нужно разбить на 20 партий по 5 элементов в каждой партии.

Пожалуйста, как разбить набор данных / фрейм искровых данных на N номер строки?

- NS

Ответы [ 2 ]

0 голосов
/ 03 октября 2018

Не уверен, что есть лучший подход, но вы можете попробовать: преобразовать ваш фрейм данных в rdd, использовать zipWithIndex, отфильтровать, а затем снова конвертировать в фрейм данных.

Например, предположим, что ваш фрейм данных задан как

scala> val df=sc.parallelize(1 to 100).toDF("value")
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.show()
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
|   11|
|   12|
|   13|
|   14|
|   15|
|   16|
|   17|
|   18|
|   19|
|   20|
+-----+
only showing top 20 rows

Преобразовать в rdd и zip с индексом следующим образом: val dfRDD=df.rdd.zipWithIndex

scala> val dfRDD=df.rdd.zipWithIndex
dfRDD: org.apache.spark.rdd.RDD[(org.apache.spark.sql.Row, Long)] = ZippedWithIndexRDD[81] at zipWithIndex at <console>:69

Для вашего первого кадра данных, содержащего строки от 1 до 5, выполните фильтрацию следующим образом:

val firstDF=dfRDD.filter{case(datum,index)=>(0 to 4).contains(index)}.map(_._1)
scala> val firstDF=dfRDD.filter{case(datum,index)=>(0 to 4).contains(index)}.map(_._1)
firstDF: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[85] at map at <console>:71

Наконец, преобразуйте в фрейм данных следующим образом: sqlContext.createDataFrame(firstDF,df.schema)

scala> sqlContext.createDataFrame(firstDF,df.schema).show()
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+

Вы должны повторить эти шаги для оставшихся строк (5 to 9), (10 to 14) и т. Д.

РЕДАКТИРОВАТЬ: Чтобы сделать вещи немного быстрее, я определил метод

def splitDF(range:scala.collection.immutable.Range.Inclusive):org.apache.spark.sql.DataFrame={
    val mySplitRDD=dfRDD.filter{case(datum,index)=>range.contains(index)}.map(_._1)
    val mySplitDF=sqlContext.createDataFrame(mySplitRDD,df.schema)
    mySplitDF
}

Затем используйте карту, чтобы получить все расщепления, например

val dataframes=List((0 to 4), (5 to 9),(10 to 14)).map(i=>splitDF(i))

scala> val dataframes=List((0 to 4), (5 to 9),(10 to 14)).map(i=>splitDF(i))
dataframes: List[org.apache.spark.sql.DataFrame] = List([value: int], [value: int], [value: int])

Dataframe 2:

scala> dataframes(1).show()
+-----+
|value|
+-----+
|    6|
|    7|
|    8|
|    9|
|   10|
+-----+

Фрейм данных 1:

scala> dataframes(0).show()
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+

Фрейм данных 3:

scala> dataframes(2).show()
+-----+
|value|
+-----+
|   11|
|   12|
|   13|
|   14|
|   15|
+-----+
0 голосов
/ 02 октября 2018

Хотите ли вы выполнить дальнейшие реляционные операции с этими разделенными наборами данных?Если нет, и если у вас есть только порядка 100 строк, то я бы сделал что-то вроде

ds.collect.grouped(5)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...