добавление уникального последовательного номера строки в фрейм данных в pyspark - PullRequest
0 голосов
/ 31 октября 2018

Я хочу добавить уникальный номер строки в мой фрейм данных в pyspark и не хочу использовать методы monotonicallyIncreasingId & partitionBy. Я думаю, что этот вопрос может быть дубликатом аналогичных вопросов, которые задавались ранее, и все еще ищу некоторый совет, правильно ли я это делаю или нет. Ниже приведен фрагмент моего кода: У меня есть CSV-файл с набором входных данных ниже:

1,VIKRANT SINGH RANA    ,NOIDA   ,10000
3,GOVIND NIMBHAL        ,DWARKA  ,92000
2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000
4,ABHIJAN SINHA         ,SAKET   ,65000
5,SUPER DEVELOPER       ,USA     ,50000
6,RAJAT TYAGI           ,UP      ,65000
7,AJAY SHARMA           ,NOIDA   ,70000
8,SIDDHARTH BASU        ,SAKET   ,72000
9,ROBERT                ,GURGAON ,70000

и я загрузил этот CSV-файл в фрейм данных.

PATH_TO_FILE="file:///u/user/vikrant/testdata/EMP_FILE.csv"

emp_df = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "true") \
  .option("inferschema", "true") \
  .option("delimiter", ",").load(PATH_TO_FILE)

+------+--------------------+--------+----------+
|emp_id|            emp_name|emp_city|emp_salary|
+------+--------------------+--------+----------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|
|     5|SUPER DEVELOPER  ...|USA     |     50000|
|     6|RAJAT TYAGI      ...|UP      |     65000|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|
|     9|ROBERT           ...|GURGAON |     70000|
+------+--------------------+--------+----------+

empRDD = emp_df.rdd.zipWithIndex()
newRDD=empRDD.map(lambda x: (list(x[0]) + [x[1]]))
 newRDD.take(2);
[[1, u'VIKRANT SINGH RANA    ', u'NOIDA   ', 10000, 0], [3, u'GOVIND NIMBHAL        ', u'DWARKA  ', 92000, 1]]

когда я включил значение int в свой список, я потерял схему dataframe.

newdf=newRDD.toDF(['emp_id','emp_name','emp_city','emp_salary','row_id'])
newdf.show();

+------+--------------------+--------+----------+------+
|emp_id|            emp_name|emp_city|emp_salary|row_id|
+------+--------------------+--------+----------+------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|     0|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|     1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|     2|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|     3|
|     5|SUPER DEVELOPER  ...|USA     |     50000|     4|
|     6|RAJAT TYAGI      ...|UP      |     65000|     5|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|     6|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|     7|
|     9|ROBERT           ...|GURGAON |     70000|     8|
+------+--------------------+--------+----------+------+

Я делаю это правильно? или есть ли лучший способ добавить или сохранить схему dataframe в pyspark?

Можно ли использовать метод zipWithIndex для добавления уникального последовательного номера строки для кадра данных большого размера? Можем ли мы использовать этот row_id для повторного разбиения кадра данных, чтобы равномерно распределить данные по разделам?

Ответы [ 2 ]

0 голосов
/ 28 ноября 2018

Я нашел решение, и оно очень простое. поскольку в моем фрейме данных нет столбца, который имеет одинаковое значение во всех строках, поэтому использование row_number не генерирует уникальные номера строк при использовании его с предложением partitionBy.

Позволяет добавить новый столбец к существующему фрейму данных с некоторым значением по умолчанию.

emp_df= emp_df.withColumn("new_column",lit("ABC"))

и создайте оконную функцию с paritionBy, используя этот столбец "new_column"

w = Window().partitionBy('new_column').orderBy(lit('A'))
df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")

вы получите желаемый результат:

+------+--------------------+--------+----------+-------+
|emp_id|            emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      2|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|      3|
|     9|ROBERT           ...|GURGAON |     70000|      4|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|      5|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|      6|
|     5|SUPER DEVELOPER  ...|USA     |     50000|      7|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      8|
|     6|RAJAT TYAGI      ...|UP      |     65000|      9|
+------+--------------------+--------+----------+-------+
0 голосов
/ 01 ноября 2018

Мне не удалось отредактировать свой вопрос или ответ (из-за моей репутации), поэтому ниже приведены мои мысли об использовании оконной функции для генерации уникального номера строки для данного кадра данных в Pyspark.

Я попытался использовать row_number с оконной функцией и partitionBy в столбце emp_city. он не генерирует уникальный номер строки, поскольку столбец emp_city не имеет того же значения. Как я могу получить уникальные последовательные номера строк для n строк в моем фрейме данных. Я смог сделать это с помощью zipWithIndex.

from pyspark.sql.functions import row_number,lit,rank
from pyspark.sql.window import Window

w = Window().partitionBy('emp_city').orderBy(lit('A'))
df = emp_df.withColumn("row_num", row_number().over(w))

+------+--------------------+--------+----------+-------+
|emp_id|            emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      1|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|      1|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|      2|
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|      2|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      1|
|     9|ROBERT           ...|GURGAON |     70000|      2|
|     6|RAJAT TYAGI      ...|UP      |     65000|      1|
|     5|SUPER DEVELOPER  ...|USA     |     50000|      1|
+------+--------------------+--------+----------+-------+

и если я не использую partitionBy с оконной функцией, она работает нормально по мере необходимости, но она перемещает все данные из 'n' разделов в один раздел

newdf=emp_df.repartition("emp_city")
partitionSizes = newdf.rdd.glom().map(len).collect();
print partitionSizes
[0, 0, 0, 0, 4, 0, 2, 1, 1, 1]
w = Window().orderBy(lit('A'))
df = newdf.withColumn("row_num", row_number().over(w))

18/11/01 05:46:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+--------------------+--------+----------+-------+
|emp_id|            emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      2|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|      3|
|     9|ROBERT           ...|GURGAON |     70000|      4|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|      5|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|      6|
|     5|SUPER DEVELOPER  ...|USA     |     50000|      7|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      8|
|     6|RAJAT TYAGI      ...|UP      |     65000|      9|
+------+--------------------+--------+----------+-------+

Я ищу возможное решение для создания нового уникального номера для моего кадра данных, которое должно быть последовательным.

...