Можем ли мы указать идентификатор последовательности для строк в Spark? - PullRequest
0 голосов
/ 28 мая 2020

Я новичок в Spark, и у меня есть около 10000 строк в файле данных для чтения

SparkSession sessionSpark = SparkSession
            .builder()
            .config(sparkConf)
            .getOrCreate();


Dataset<Row> dataset = sessionSpark.read.parquet("s3://databucket/files/")

У меня есть вариант использования, чтобы добавить номер строки в каждую строку в наборе данных, номер строки должен начинаться с От 1 до 10000 (поскольку файл имеет 10000 записей), можно ли назначить номер строки, и мы знаем, что Spark перетасовывает данные, но, скажем, даже после повторного запуска одного и того же файла из приложения дважды сгенерированный номер строки должен быть таким же, можно ли сделать?

Ответы [ 3 ]

1 голос
/ 28 мая 2020

РЕДАКТИРОВАТЬ : решение, которое соответствует последовательному идентификатору, начиная с 1

Если вы можете заказать их по чему-то, это должно быть возможно. В этом примере может быть scala, но основной частью по-прежнему является часть SQL.

val df = sc.parallelize(Seq(("alfa", 10), ("beta", 20), ("gama", 5))).toDF("word", "count")
df.createOrReplaceTempView("wordcount")

// MAIN PART
val tmpTable = spark.sqlContext.sql("select row_number() over (order by count) as index,word,count from wordcount")

tmpTable.show()

+-----+----+-----+
|index|word|count|
+-----+----+-----+
|    1|gama|    5|
|    2|alfa|   10|
|    3|beta|   20|
+-----+----+-----+

EDIT : если вам не нужны простые числа, go с га sh ряда. Так лучше.

1 голос
/ 28 мая 2020

monotonically_increasing_id() добавит инкрементный идентификатор для ваших строк

import org.apache.spark.sql.functions._
Dataset<Row> dataset = sessionSpark.read.parquet("s3://databucket/files/").withColumn("rowNum", monotonically_increasing_id())

Из официальных документов Spark

Выражение столбца, которое генерирует монотонно возрастающие 64-битные целые числа.

Сгенерированный идентификатор гарантированно будет монотонно увеличивающимся и уникальным, но не последовательным. Текущая реализация помещает идентификатор раздела в верхний 31 бит, а номер записи в каждом разделе - в нижние 33 бита. Предполагается, что фрейм данных имеет менее 1 миллиарда разделов, а в каждом разделе меньше 8 миллиардов записей.

В качестве примера рассмотрим DataFrame с двумя разделами, каждый с 3 записями. Это выражение вернет следующие идентификаторы:

{{{0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.}}} </p>

0 голосов
/ 28 мая 2020

Не на Java, я не специализируюсь на этом, а на Scala. Должно быть достаточно легко преобразовать для вас. Просто пример, который у меня есть с использованием DS с классами case:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Encoder, Encoders}
import spark.implicits._

// Gen some example data via DF, can come from files, ordering in those files assumed. I.e. no need to sort.
val df = Seq(
  ("1 February"), ("n"), ("c"), ("b"), 
  ("2 February"), ("hh"), ("www"), ("e"), 
  ("3 February"), ("y"), ("s"), ("j"),
  ("1 March"), ("c"), ("b"), ("x"),
  ("1 March"), ("c"), ("b"), ("x"),
  ("2 March"), ("c"), ("b"), ("x"),
  ("3 March"), ("c"), ("b"), ("x"), ("y"), ("z")
           ).toDF("line")

// Define Case Classes to avoid Row aspects on df --> rdd --> to DF. 
case class X(line: String)   
case class Xtra(key: Long, line: String)

// Add the Seq Num using zipWithIndex. Then convert back, but will have a struct to deal wit.
// You can avoid the struct if using Row and such. But general idea should be clear.
val rdd = df.as[X].rdd.zipWithIndex().map{case (v,k) => (k,v)}
val ds = rdd.toDF("key", "line").as[Xtra]
ds.show(100,false)

возвращает:

+---+------------+
|key|line        |
+---+------------+
|0  |[1 February]|
|1  |[n]         |  
|2  |[c]         |
...

Ответы на сегодняшний день не соответствуют потребностям в качестве вопроса, но если только 10K строк, тогда единственный раздел не является проблемой. Хотя для строк размером 10К нужно задать несколько вопросов.

Если вы не против Row, вот другой подход:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}

val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))

val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
...