Я довольно новичок в PySpark с некоторым опытом работы с Python.Я уже в состоянии отфильтровать строки в фрейме данных и написал udf, которые вычисляют результаты по массивам в ячейках DataFrame с int или double в качестве результата.Нет, мне нужен массив в качестве вывода, и после нескольких часов я не нашел полезного примера.
Вот проблема:
DataFrame имеет следующую схему, где число - это числозаписи массивов той же строки DataFrame:
DataFrame[number: int, code: array<string>, d1: array<double>, d2: array<double>]
Вот пример DataFrame с именем df1:
[4 ,['correct', 'correct', 'wrong', 'correct'], [33, 42, 35, 76], [12, 35, 15, 16]]
[2 ,['correct', 'wrong'], [47, 43], [13, 17]]
Теперь, только если у меня есть «правильный» в iПоложение кодового столбца строки DataFrame. Я хочу сохранить позицию i d1 и d2.Дополнительно я хочу иметь новый номерNew с оставшимся количеством позиций.Результирующая структура и DataFrame «df2» должны выглядеть следующим образом:
DataFrame[number: int, numberNew: int, code: array<string>, d1: array<double>, d2: array<double>]
[4 , 3, ['correct', 'correct', 'correct'], [33, 42, 76], [12, 35, 16]]
[2 , 1, ['correct'], [47], [13]]
Среди нескольких других вещей (и на основе успешного решения в Python) я попробовал следующий код:
def filterDF(number, code, d1, d2):
dataFiltered = []
numberNew = 0
for i in range(number):
if code[i] == 'correct':
dataFiltered.append([d1[i],d2[i]])
countNew += 1
newTable = {'countNew' : countNew, 'data' : dataFiltered}
newDf = pd.DataFrame(newTable)
return newDf
from pyspark.sql.types import ArrayType
filterDFudf = sqlContext.udf.register("filterDF", filterDF, "Array<double>")
df2 = df1.select(df1.number, filterDFudf(df1.number, df1.code, df1.d1, df1.d2)).alias('dataNew')
Я получил довольно длинное и не очень полезное сообщение об ошибке.То есть была следующая информация: TypeError: у объекта 'float' нет атрибута ' getitem '
Было бы здорово, если бы кто-то здесь мог показать мне, как решить эту проблему.