Я пытаюсь преобразовать функцию pandas на двух фреймах данных в функцию pyspark.
В частности, у меня есть дата-кадр ключей и функций в виде строк, а именно:
> mv
| Keys | Formula | label |
---------------------------------------
| key1 | 'val1 + val2 - val3' | name1 |
| key2 | 'val3 + val4' | name2 |
| key3 | 'val1 - val4' | name3 |
и датафрейм df:
> df
| Keys | Datetime | names | values |
------------------------------------
| key1 | tmstmp1 | val1 | 0.3 |
| key1 | tmstmp1 | val2 | 0.4 |
| key1 | tmstmp1 | val3 | 0.2 |
| key1 | tmstmp1 | val4 | 1.2 |
| key1 | tmstmp2 | val1 | 0.5 |
| key2 | tmstmp1 | val1 | 1.1 |
| key2 | tmstmp2 | val3 | 1.0 |
| key2 | tmstmp1 | val3 | 1.3 |
и т. Д.
Я создал две функции, которые читают код, оценивают строку и выражение меры и возвращают список pandas.DataFrame, который в конце я конкатюрую.
def evaluate_vm(x, regex):
m = re.findall(regex, x)
to_replace = ['#[' + i + ']' for i in m]
replaces = [i.split(', ') for i in m]
replacement = ["df.loc[(df.Keys == %s) & (df.names == %s), ['Datetime', 'values']].dropna().set_index('Datetime')"%tuple(i) for i in replaces]
for i in range(len(to_replace)):
x = x.replace(to_replace[i], replacement[i])
return eval(x)
def _mv_(X):
formula = evaluate_vm(X.Formula)
formula['Keys'] = X.Keys
formula.reset_index(inplace = True)
formula.rename_axis({'Formula': 'Values'}, axis = 1, inplace = True)
return formula[['Keys', 'Datetime', 'names', 'Values']]
После этого мой код
res = pd.concat([_mv_(mv.loc[i]) for i in mv.index])
и res - это то, что мне нужно получить.
ПРИМЕЧАНИЕ. Я немного изменил функции и входы, чтобы сделать его понятным: во всяком случае, я не думаю, что проблема здесь.
Вот в чем дело. Я пытаюсь превратить это в pyspark.
Это код, который я написал так далеко.
from pyspark.sql.functions import pandas_udf, PandasUDFType, struct
from pyspark.sql.types import FloatType, StringType, IntegerType, TimestampType, StructType, StructField
EvaluateVM = pandas_udf(lambda x: _mv_(x),\
functionType = PandasUDFType.GROUPED_MAP, \
returnType = StructType([StructField("Keys", StringType(), False),
StructField("Datetime", TimestampType(), False),\
StructField("names", StringType(), False),\
StructField("Values", FloatType(), False)])
)
res = EvaluateVM(struct([mv[i] for i in mv.columns]))
Это "почти" работает: когда я печатаю res, вот результат.
> res
Column<<lambda>(named_struct(Keys, Keys, Formula, Formula))>
И я не вижу внутри res: я думаю, что он создал что-то вроде итерируемого питона, но я хотел бы получить тот же результат, что и в пандах.
Что мне делать? Я все понял неправильно?
РЕДАКТИРОВАТЬ: Я думаю, что проблема может быть в том, что в пандах я создаю список фреймов данных, которые я объединяю после их оценки, в pyspark я использую вид apply(_mv_, axis = 1)
: этот тип синтаксиса дал мне ошибку даже в панды (cannot concatenate dataframe of dimension 192 into one of size 5
, что-то в этом роде), и мой обходной путь был pandas.concat([…])
. Я не знаю, работает ли это и в pyspark, или есть какой-то способ избежать этого.
РЕДАКТИРОВАТЬ 2: Извините, я не написал ожидаемый вывод:
| Keys | Datetime | label | values |
---------------------------------------------
| key1 | tmstmp1 | name1 | 0.3 + 0.4 - 0.2 |
| key1 | tmstmp1 | name2 | 0.2 + 1.2 |
и так далее. Столбец значений должен содержать числовой результат, здесь я написал операнды, чтобы вы поняли.