Проверка Pyspark, если какая-либо из строк больше нуля - PullRequest
0 голосов
/ 10 января 2020

Я хотел отфильтровать строки с нулевыми значениями для всех столбцов в списке.

Предположим, например, что у нас есть следующие df,

df = spark.createDataFrame([(0, 1, 1, 2,1), (0, 0, 1, 0, 1), (1, 0, 1, 1 ,1)], ['a', 'b', 'c', 'd', 'e'])
+---+---+---+---+---+                                                           
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|  0|  1|  1|  2|  1|
|  0|  0|  1|  0|  1|
|  1|  0|  1|  1|  1|
+---+---+---+---+---+

и список столбцы - это ['a', 'b', 'd'], поэтому отфильтрованный фрейм данных должен быть

+---+---+---+---+---+                                                           
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|  0|  1|  1|  2|  1|
|  1|  0|  1|  1|  1|
+---+---+---+---+---+

Это то, что я пробовал,

df = df.withColumn('total', sum(df[col] for col in ['a', 'b', 'd']))
df = df.filter(df.total > 0).drop('total')

Это прекрасно работает для небольших наборов данных, но завершается неудачно со следующей ошибкой, если col_list очень длинный со следующей ошибкой.

ava.lang.StackOverflowErrorat org. apache .spark. sql .catalyst.analysis. ResolveLambdaVariables.org $ apache $ spark $ sql $ катализатор $ анализ $ ResolveLambdaVariables $$ Resolution (выше ...

Я могу подумать о решении pandas udf, но мой df очень большой, и это может быть узким местом.

Редактировать:

При использовании ответа @ Psidom я получаю следующую ошибку

py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o2508.filter.: java .lang.StackOverflowError at org. apache .spark. sql .cata lyst.expressions.Expression.references (Expression. scala: 88) в org. apache .spark. sql .catalyst.expressions.Expression $$ anonfun $ ссылается на $ 1.apply (выражение. scala: 88 ) в орг. apache .spark. sql .catalyst.expressions.Expression $$ anonfun $ reference $ 1.apply (Expression. scala: 88) в scala .collection.TraversableLike $$ anonfun $ flatMap $ 1 .apply (TraversableLike. scala: 241) в scala .collection.TraversableLike $$ anonfun $ flatMap $ 1.apply (TraversableLike. scala: 241) в scala .collection.immutable.List.foreach (список) . scala: 392) в scala .collection.TraversableLike $ class.flatMap (TraversableLike. scala: 241) в scala .collection.immutable.List.flatMap (List. scala: 355)

Ответы [ 4 ]

1 голос
/ 10 января 2020

Вы можете передать столбцы как массив в UDF , а затем проверить, являются ли все значения нулями или нет, и затем применить фильтр:

from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf, array, col

all_zeros_udf = udf(lambda arr: arr.count(0) == len(arr), BooleanType())

df = spark.createDataFrame([(0, 1, 1, 2,1), (0, 0, 1, 0, 1), (1, 0, 1, 1 ,1)], ['a', 'b', 'c', 'd', 'e'])

df
.withColumn('all_zeros', all_zeros_udf(array('a', 'b', 'd'))) # pass the columns as array
.filter(~col('all_zeros')) # Filter the columns where all values are NOT zeros
.drop('all_zeros')  # Drop the column
.show()

Результат:

+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|  0|  1|  1|  2|  1|
|  1|  0|  1|  1|  1|
+---+---+---+---+---+
1 голос
/ 10 января 2020

Вот другое решение. Не пробовал на большом наборе столбцов, пожалуйста, дайте мне знать, если это работает.

df = spark.createDataFrame([(0, 1, 1, 2,1), (0, 0, 1, 0, 1), (1, 0, 1, 1 ,1)], ['a', 'b', 'c', 'd', 'e'])
df.show()

+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|  0|  1|  1|  2|  1|
|  0|  0|  1|  0|  1|
|  1|  0|  1|  1|  1|
+---+---+---+---+---+

df = df.withColumn("Concat_cols" , F.concat(*list_of_cols)) # concat the list of columns 
df.show()

+---+---+---+---+---+-----------+
|  a|  b|  c|  d|  e|Concat_cols|
+---+---+---+---+---+-----------+
|  0|  1|  1|  2|  1|        012|
|  0|  0|  1|  0|  1|        000|
|  1|  0|  1|  1|  1|        101|
+---+---+---+---+---+-----------+

pattern =  '0' * len(list_of_cols) 

df1 = df.where(df['Concat_cols'] != pattern) # pattern will be 0's and the number will be equal to length of the columns list.
df1.show()

    +---+---+---+---+---+-----------+
    |  a|  b|  c|  d|  e|Concat_cols|
    +---+---+---+---+---+-----------+
    |  0|  1|  1|  2|  1|        012|
    |  1|  0|  1|  1|  1|        101|
    +---+---+---+---+---+-----------+
1 голос
/ 10 января 2020

functools.reduce может быть полезно здесь:

df = spark.createDataFrame([(0, 1, 1, 2,1), (0, 0, 1, 0, 1), (1, 0, 1, 1 ,1)], 
     ['a', 'b', 'c', 'd', 'e'])
cols = ['a', 'b', 'd']

Используйте reduce для создания выражения фильтра:

from functools import reduce
predicate = reduce(lambda a, b: a | b, [df[x] != 0 for x in cols])

print(predicate)
# Column<b'(((NOT (a = 0)) OR (NOT (b = 0))) OR (NOT (d = 0)))'>

Затем filter с predicate:

df.where(predicate).show()
+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
|  0|  1|  1|  2|  1|
|  1|  0|  1|  1|  1|
+---+---+---+---+---+
0 голосов
/ 10 января 2020

Если целью является просто проверка вхождения 0 во всех столбцах и списки вызывают проблемы, возможно, объедините их 1000 за раз, а затем проверьте ненулевое вхождение.

from pyspark.sql import functions as F

# all or whatever columns you would like to test.
columns = df.columns 

# Columns required to be concatenated at a time.
split = 1000 

# list of 1000 columns concatenated into a single column
blocks = [F.concat(*columns[i*split:(i+1)*split]) 
            for i in range((len(columns)+split-1)//split)]

# where expression here replaces zeroes to check if the resultant string is blank or not.
(df.select("*")
    .where(F.regexp_replace(F.concat(*blocks).alias("concat"), "0", "") != "" )
    .show(10, False))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...