Scala: Как я могу разделить фрейм данных по номеру строки? - PullRequest
1 голос
/ 14 марта 2019

Я хочу разделить фрейм данных, состоящий из 2,7 млн. Строк, на небольшие фреймы, состоящие из 100000 строк, поэтому в итоге получится примерно 27 фреймов, которые я тоже хочу сохранить в виде CSV-файлов.

Я уже посмотрел этот раздел Byy и groupBy, но мне не нужно беспокоиться о каких-либо условиях, за исключением того, что они должны быть упорядочены по дате. Я пытаюсь написать свой собственный код для этой работы, но если вы знаете о некоторых функциях Scala (Spark), которые я мог бы использовать, это было бы здорово!

Спасибо всем за предложения!

1 Ответ

2 голосов
/ 14 марта 2019

Вы можете использовать zipWithIndex из API RDD (к сожалению, нет эквивалента в SparkSQL), который сопоставляет каждую строку с индексом в диапазоне от 0 до rdd.count - 1.

Так что если у вас есть фрейм данныхчто я предположил, чтобы быть отсортированы соответственно, вам нужно будет переходить между двумя API следующим образом:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// creating mock data
val df = spark.range(100).withColumn("test", 'id % 10)

// zipping the data
val partitionSize = 5 // I use 5 but you can use 100000 in your case
val zipped_rdd = df.rdd
    .zipWithIndex.map{ case (row, id) => 
        Row.fromSeq(row.toSeq :+ id / partitionSize ) 
    }

//back to df
val newField = StructField("partition", LongType, false)
val zipped_df = spark
    .createDataFrame(zipped_rdd, df.schema.add(newField))

Давайте посмотрим на данные, у нас есть новый столбец с именем раздела, и это соответствуеттак, как вы хотите разделить ваши данные.

zipped_df.show(15) // 5 rows by partition
+---+----+---------+
| id|test|partition|
+---+----+---------+
|  0|   0|        0|
|  1|   1|        0|
|  2|   2|        0|
|  3|   3|        0|
|  4|   4|        0|
|  5|   5|        1|
|  6|   6|        1|
|  7|   7|        1|
|  8|   8|        1|
|  9|   9|        1|
| 10|   0|        2|
| 11|   1|        2|
| 12|   2|        2|
| 13|   3|        2|
| 14|   4|        2|
+---+----+---------+

// using partitionBy to write the data
zipped_df.write
    .partitionBy("partition")
    .csv(".../testPart.csv")
...