PySpark Access DataFrame столбцы в пользовательской функции foreachPartition () - PullRequest
0 голосов
/ 22 мая 2018

У меня есть функция с именем "inside".Я хочу применить эту функцию к фрейму данных pyspark.Для этого я вызываю метод "foreachPartition (inside)" на созданном мною кадре.Функция "inside" нуждается в значениях кадра данных.

Фрейм данных выглядит следующим образом:

>>> small_df
DataFrame[lon: double, lat: double, t: bigint]

Код выглядит следующим образом:

def inside(iterator):
    row=iterator
    x=row.lon
    y=row.lat
    i=row.t 
    #do more stuff

small=pliades.iloc[0:20000,:] #take sample of rows from big dataset
small_df=sqlContext.createDataFrame(small) #create dataframe
test=small_df.foreachPartition(inside)

Мой вопрос: как получить x, y, я могу получить значенияпервого (lon), второго (lat) и третьего (t) столбцов кадра данных соответственно?

Я попытался также сделать это с row.lon, row.select, рассматривая его как список, но не могне получить требуемый результат.

1 Ответ

0 голосов
/ 22 мая 2018

foreach работает на RDD[Row], и каждый раздел равен Iterator[Row].Если вы хотите иметь список всех значений (не рекомендуется из-за возможных проблем с памятью

def inside(iterator):
    x, y, i = zip(*iterator)
    ...
    yield ...

В общем случае лучше просто перебирать строки по одной, не сохраняя все в памяти:

def inside(iterator):
    for x, y, i in iterator:
        yield ...

Вы также можете рассмотреть возможность использования pandas_udf:

  • Если функция возвращает одинаковое количество значений и только один столбец, вы можете использовать скалярный тип, который принимает pandas.Series ивозвращает pandas.Series

    from pyspark.sql.functions import pandas_udf, PandasUDFType
    
    @pandas_udf(schema, PandasUDFType.SCALAR)
    def f(*cols: pandas.Series) -> pandas.Series:
        ...
    
    df.select(f("col1", "col2", ...))
    
  • Сгруппированный вариант, который принимает pandas.DataFrame и возвращает pandas.DataFrame с тем же или другим количеством строк:

    from pyspark.sql.functions import spark_partition_id
    
    
    
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def g(df: pandas.DataFrame) -> pandas.DataFrame:
        ...
    
    df.groupby(spark_partition_id()).apply(g)
    
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...