Spark - чтение файла CSV и принудительное включение последовательного режима - PullRequest
0 голосов
/ 14 апреля 2020

Все,

Я обрабатываю файлы данных поставщика и добавляю несколько дополнительных полей (ОБОГАЩЕНИЕ), и мне необходимо всегда поддерживать порядок файлов.

Следовательно, чтобы достичь выше Я добавляю идентификатор последовательности с помощью monotonically_increasing_id (); Как я могу гарантировать, что эта операция выполняется с разделом = 1, чтобы идентификатор не повторялся .. Я открыт для альтернативных предложений.

    val srcDF = spark.read.textFile(PATH).withColumn("idCol", monotonically_increasing_id())

1 Ответ

0 голосов
/ 14 апреля 2020

Вместо monotonically_increasing_id используйте row_number оконную функцию.

  • Используйте spark.read.csv, если вы читаете файл с разделителями.

Example:

//sample data

$cat t1.txt
NAME|AGE|COUNTRY
d|18|USA
a|18|USA
b|20|Germany
c|23|USA

import org.apache.spark.sql.expressions.Window

val w=Window.orderBy("NAME")

spark.
read.
option("header",true).
option("delimiter","|").
csv("t1.txt").
withColumn("idCol",row_number().over(w)).
show()

//+----+---+-------+-----+
//|NAME|AGE|COUNTRY|idCol|
//+----+---+-------+-----+
//|   a| 18|    USA|    1|
//|   b| 20|Germany|    2|
//|   c| 23|    USA|    3|
//|   d| 18|    USA|    4|
//+----+---+-------+-----+

Мы упорядочиваем по столбцу NAME, а затем добавление idCol будет назначено всем строки без повторений.

Кроме того, если столбец orderby отсутствует, попробуйте:

val w=Window.orderBy(lit("1"))

spark.
read.
option("header",true).
option("delimiter","|").
csv("t1.txt").
withColumn("idCol",row_number().over(w)).
show()

//+----+---+-------+-----+
//|NAME|AGE|COUNTRY|idCol|
//+----+---+-------+-----+
//|   d| 18|    USA|    1|
//|   a| 18|    USA|    2|
//|   b| 20|Germany|    3|
//|   c| 23|    USA|    4|
//+----+---+-------+-----+
...