Преобразование данных Pyspark в панд сбрасывает данные? - PullRequest
0 голосов
/ 01 мая 2018

У меня довольно сложный процесс создания фрейма данных pyspark, преобразования его в фрейм данных pandas и вывода результата в плоский файл. Я не уверен, когда произойдет ошибка, поэтому я опишу весь процесс.

Начиная у меня есть фрейм данных pyspark, который содержит попарное сходство для наборов идентификаторов. Это выглядит так:

  +------+-------+-------------------+
  |  ID_A|   ID_B|  EuclideanDistance|
  +------+-------+-------------------+
  |     1|      1|                0.0|
  |     1|      2|0.13103884200454394|
  |     1|      3| 0.2176246463836219|
  |     1|      4|  0.280568636550471|
 ...

Я бы хотел сгруппировать его по ID_A, отсортировать каждую группу по EuclideanDistance и взять только верхние N пар для каждой группы. Итак, сначала я делаю это:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, row_number

window = Window.partitionBy(df['ID_A']).orderBy(df_sim['EuclideanDistance'])
result = (df.withColumn('row_num', row_number().over(window)))

Я уверен, что ID_A = 1 все еще находится в кадре данных "result". Затем я делаю это, чтобы ограничить каждую группу всего 20 строками:

result1 = result.where(result.row_num<20)
result1.toPandas().to_csv("mytest.csv")

и ID_A = 1 НЕ в результирующем файле .csv (хотя он все еще присутствует в result1). Есть ли где-то проблема в этой цепочке конверсий, которая может привести к потере данных?

Ответы [ 2 ]

0 голосов
/ 07 мая 2018

Как упоминал Дэвид, вы ссылаетесь на второй фрейм данных "df_sim" в своей оконной функции.

Я проверил следующее, и оно работает на моей машине (известные последние слова):

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, row_number
import pandas as pd

#simulate some data
df = pd.DataFrame({'ID_A': pd.np.arange(100)%5, 
    'ID_B': pd.np.repeat(pd.np.arange(20),5), 
    'EuclideanDistance': pd.np.random.rand(100)*5}
    )
#artificially set distance between point and self to 0
df['EuclideanDistance'][df['ID_A'] == df['ID_B']] = 0
df = spark.createDataFrame(df)
#end simulation
window = Window.partitionBy(df['ID_A']).orderBy(df['EuclideanDistance'])
output = df.select('*', row_number().over(window).alias('rank')).filter(col('rank') <= 10)
output.show(50)

Код симуляции предназначен только для того, чтобы сделать это автономным примером. Конечно, вы можете использовать свой фактический фрейм данных и игнорировать симуляцию при тестировании. Надеюсь, что это работает!

0 голосов
/ 01 мая 2018

Вы ссылаетесь на 2 кадра данных в окне вашего решения. Не уверен, что это вызывает вашу ошибку, но это стоит исправить. В любом случае вам не нужно ссылаться на определенный фрейм данных в определении окна . В любом случае попробуйте

window = Window.partitionBy('ID_A').orderBy('EuclideanDistance')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...