Сложные функции от Python до Pyspark - РЕДАКТИРОВАТЬ: проблема конкатенации (я думаю) - PullRequest
0 голосов
/ 29 марта 2019

Я пытаюсь преобразовать функцию pandas на двух фреймах данных в функцию pyspark.

В частности, у меня есть дата-кадр ключей и функций в виде строк, а именно:

> mv

| Keys |       Formula        | label |
---------------------------------------
| key1 | 'val1 + val2 - val3' | name1 |
| key2 | 'val3 + val4'        | name2 |
| key3 | 'val1 - val4'        | name3 |

и датафрейм df:

> df

| Keys | Datetime | names | values |
------------------------------------
| key1 | tmstmp1  | val1  |  0.3   |
| key1 | tmstmp1  | val2  |  0.4   |
| key1 | tmstmp1  | val3  |  0.2   |
| key1 | tmstmp1  | val4  |  1.2   |
| key1 | tmstmp2  | val1  |  0.5   |
| key2 | tmstmp1  | val1  |  1.1   |
| key2 | tmstmp2  | val3  |  1.0   |
| key2 | tmstmp1  | val3  |  1.3   |

и т. Д.

Я создал две функции, которые читают код, оценивают строку и выражение меры и возвращают список pandas.DataFrame, который в конце я конкатюрую.

def evaluate_vm(x, regex):
    m = re.findall(regex, x)
    to_replace = ['#[' + i + ']' for i in m]
    replaces = [i.split(', ') for i in m]
    replacement = ["df.loc[(df.Keys == %s) & (df.names == %s), ['Datetime', 'values']].dropna().set_index('Datetime')"%tuple(i) for i in replaces]
    for i in range(len(to_replace)):
        x = x.replace(to_replace[i], replacement[i])
    return eval(x)


def _mv_(X):
    formula = evaluate_vm(X.Formula)
    formula['Keys'] = X.Keys
    formula.reset_index(inplace = True)
    formula.rename_axis({'Formula': 'Values'}, axis = 1, inplace = True)
    return formula[['Keys', 'Datetime', 'names', 'Values']]

После этого мой код

res = pd.concat([_mv_(mv.loc[i]) for i in mv.index])

и res - это то, что мне нужно получить.

ПРИМЕЧАНИЕ. Я немного изменил функции и входы, чтобы сделать его понятным: во всяком случае, я не думаю, что проблема здесь.

Вот в чем дело. Я пытаюсь превратить это в pyspark.

Это код, который я написал так далеко.

from pyspark.sql.functions import pandas_udf, PandasUDFType, struct
from pyspark.sql.types import FloatType, StringType, IntegerType, TimestampType, StructType, StructField

EvaluateVM = pandas_udf(lambda x: _mv_(x),\
                        functionType = PandasUDFType.GROUPED_MAP, \
                        returnType = StructType([StructField("Keys", StringType(), False),
                                                 StructField("Datetime", TimestampType(), False),\
                                                 StructField("names", StringType(), False),\
                                                 StructField("Values", FloatType(), False)])
                       )

res = EvaluateVM(struct([mv[i] for i in mv.columns]))

Это "почти" работает: когда я печатаю res, вот результат.

> res
Column<<lambda>(named_struct(Keys, Keys, Formula, Formula))>

И я не вижу внутри res: я думаю, что он создал что-то вроде итерируемого питона, но я хотел бы получить тот же результат, что и в пандах.

Что мне делать? Я все понял неправильно?

РЕДАКТИРОВАТЬ: Я думаю, что проблема может быть в том, что в пандах я создаю список фреймов данных, которые я объединяю после их оценки, в pyspark я использую вид apply(_mv_, axis = 1): этот тип синтаксиса дал мне ошибку даже в панды (cannot concatenate dataframe of dimension 192 into one of size 5, что-то в этом роде), и мой обходной путь был pandas.concat([…]). Я не знаю, работает ли это и в pyspark, или есть какой-то способ избежать этого.

РЕДАКТИРОВАТЬ 2: Извините, я не написал ожидаемый вывод:

| Keys | Datetime | label |     values      |
---------------------------------------------
| key1 | tmstmp1  | name1 | 0.3 + 0.4 - 0.2 |
| key1 | tmstmp1  | name2 |    0.2 + 1.2    |

и так далее. Столбец значений должен содержать числовой результат, здесь я написал операнды, чтобы вы поняли.

...