Фильтрация в массивах, расположенных в ячейках pyspark.sql.dataframe - PullRequest
0 голосов
/ 10 октября 2018

Я довольно новичок в PySpark с некоторым опытом работы с Python.Я уже в состоянии отфильтровать строки в фрейме данных и написал udf, которые вычисляют результаты по массивам в ячейках DataFrame с int или double в качестве результата.Нет, мне нужен массив в качестве вывода, и после нескольких часов я не нашел полезного примера.

Вот проблема:

DataFrame имеет следующую схему, где число - это числозаписи массивов той же строки DataFrame:

DataFrame[number: int, code: array<string>, d1: array<double>, d2: array<double>]

Вот пример DataFrame с именем df1:

[4 ,['correct', 'correct', 'wrong', 'correct'], [33, 42, 35, 76], [12, 35, 15, 16]] 
[2 ,['correct', 'wrong'], [47, 43], [13, 17]] 

Теперь, только если у меня есть «правильный» в iПоложение кодового столбца строки DataFrame. Я хочу сохранить позицию i d1 и d2.Дополнительно я хочу иметь новый номерNew с оставшимся количеством позиций.Результирующая структура и DataFrame «df2» должны выглядеть следующим образом:

DataFrame[number: int, numberNew: int, code: array<string>, d1: array<double>, d2: array<double>]

[4 , 3, ['correct', 'correct', 'correct'], [33, 42, 76], [12, 35, 16]] 
[2 , 1, ['correct'], [47], [13]] 

Среди нескольких других вещей (и на основе успешного решения в Python) я попробовал следующий код:

def filterDF(number, code, d1, d2):
    dataFiltered = []
    numberNew = 0
    for i in range(number):
        if code[i] == 'correct':
            dataFiltered.append([d1[i],d2[i]])
            countNew += 1
    newTable = {'countNew' : countNew, 'data' : dataFiltered}
    newDf = pd.DataFrame(newTable)
    return newDf    

from pyspark.sql.types import ArrayType
filterDFudf = sqlContext.udf.register("filterDF", filterDF, "Array<double>")

df2 = df1.select(df1.number, filterDFudf(df1.number, df1.code, df1.d1, df1.d2)).alias('dataNew')

Я получил довольно длинное и не очень полезное сообщение об ошибке.То есть была следующая информация: TypeError: у объекта 'float' нет атрибута ' getitem '

Было бы здорово, если бы кто-то здесь мог показать мне, как решить эту проблему.

Ответы [ 2 ]

0 голосов
/ 10 октября 2018

В качестве альтернативного решения вы также можете использовать для понимания функции понимание списка в python:

def get_filtered_data(code, d1, d2):

    indices = [i for i, s in enumerate(code) if 'correct' in s]
    d1_ = [d1[index] for index in indices]
    d2_ = [d2[index] for index in indices]
    return [len(indices), d1_, d2_]

udf_get_filtered_data = udf(get_filtered_data, ArrayType(StringType()))

df = df.withColumn('filtered_data', udf_get_filtered_data('code', 'd1', 'd2'))

df.show () возвращает следующее

+------+--------------------+----------------+----------------+--------------------+
|number|                code|              d1|              d2|       filtered_data|
+------+--------------------+----------------+----------------+--------------------+
|     4|[correct, correct...|[33, 42, 35, 76]|[12, 35, 15, 16]|[3, [33, 42, 76],...|
|     2|    [correct, wrong]|        [47, 43]|        [13, 17]|     [1, [47], [13]]|
+------+--------------------+----------------+----------------+--------------------+

Кстати, если вы используете

dataFiltered.append([d1[i],d2[i]]) 

Это не даст вам желаемого результата, который вы указали ([33, 42, 76], [12, 35, 16]).Скорее, он даст вам ([33,12], [42,35], [76,16])

. Этот ответ выше даст вам правильные результаты в d1 и d2 в отдельном списке, как упомянуто в вопросе.

0 голосов
/ 10 октября 2018

Вы не можете вернуть фрейм данных Pandas из udf следующим образом (есть другие варианты, которые поддерживают это, но они не соответствуют вашей логике), и схема все равно не соответствует выходным данным.Переопределите свою функцию следующим образом:

def filterDF(number, code, d1, d2):
    dataFiltered = []
    countNew = 0
    for i in range(number):
        if code[i] == 'correct':
            dataFiltered.append([d1[i],d2[i]])
            countNew += 1
    return (countNew, dataFiltered)

filterDFudf = sqlContext.udf.register(
    "filterDF", filterDF, 
    "struct<countNew: long, data: array<array<long>>>"
)

Тест:

df = sqlContext.createDataFrame([
    (4 ,['correct', 'correct', 'wrong', 'correct'], [33, 42, 35, 76], [12, 35, 15, 16]),
    (2 ,['correct', 'wrong'], [47, 43], [13, 17])
]).toDF("number", "code", "d1", "d2")

df.select(filterDFudf("number", "code", "d1", "d2")).show()
# +------------------------------+                                                
# |filterDF(number, code, d1, d2)|
# +------------------------------+
# |          [3, [[33, 12], [4...|
# |               [1, [[47, 13]]]|
# +------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...