pyspark: возвращает столбцы, в которых все ячейки соответствуют регулярному выражению. - PullRequest
2 голосов
/ 10 октября 2019

У меня есть искровой фрейм данных с несколькими столбцами, и каждый столбец содержит строку.

Например, ввод: +--------------+--------------+--------------+--------------+ | c1| c2 | c3| c4| +--------------+--------------+--------------+--------------+ |11 - 12 - 1993| 4 | 4 | 2014 | 8 - 7 - 2013 | null | |12 / 6 / 1965 | 8 - 6 - 2013 | date missing |11 - 12 - 1993| |10 / 5 / 2001 | 7 - 11 - 2011| 4 | 5 | 2015 | 10 / 5 / 2001| +--------------+--------------+--------------+--------------+

Мне нужно вернуть столбцы, в которых все значения соответствуют определенному регулярному выражениюpattern.

В случае этого примера я должен вернуть все столбцы, в которых все значения являются действительными датами. То есть все значения в столбцах C1 и C2 имеют действительную дату (независимо от их формата), должны быть возвращены в этом случае.

Например, Вывод +--------------+--------------+ | c1| c2 | +--------------+--------------+ |11 - 12 - 1993| 4 | 4 | 2014 | |12 / 6 / 1965 | 8 - 6 - 2013 | |10 / 5 / 2001 | 7 - 11 - 2011| +--------------+--------------+

У меня есть регулярное выражение. Каков был бы лучший способ сделать это? Я хочу найти наиболее эффективный способ сделать это.

1 Ответ

1 голос
/ 10 октября 2019

В одну сторону без использования регулярного выражения:

from pyspark.sql.functions import coalesce, to_date

# list of all expected date format
fmts = ['MM - d - yyyy', 'MM / d / yyyy', 'MM | d | yyyy']

# columns to check
cols = df.columns

# convert columns into date using coalesce and to_date on all available fmts 
# convert the resulting column to StringType (as df.summary() doesn't work on DateType)
df1 = df.select([ coalesce(*[to_date(c, format=f) for f in fmts]).astype('string').alias(c) for c in cols])
df1.show()
+----------+----------+----------+----------+
|        c1|        c2|        c3|        c4|
+----------+----------+----------+----------+
|1993-11-12|2014-04-04|2013-08-07|      null|
|1965-12-06|2013-08-06|      null|1993-11-12|
|2001-10-05|2011-07-11|2015-04-05|2001-10-05|
+----------+----------+----------+----------+

Теперь ваша задача становится подсчитываемой, если столбцы содержат нулевые значения.

# get all Number of rows in df
N = df.count()
# 3

# use df.summary('count') find all non-null #Row for each columns
df1.summary('count').show()                                                                                        
+-------+---+---+---+---+
|summary| c1| c2| c3| c4|
+-------+---+---+---+---+
|  count|  3|  3|  2|  2|
+-------+---+---+---+---+

найти имена столбцов, имеющие count == N:

cols_keep = [ c for c,v in df1.summary('count').select(cols).first().asDict().items() if int(v) == N ]
# ['c1', 'c2']

df_new = df.select(cols_keep)

используйте регулярное выражение

Если вы хотите использовать свое регулярное выражение для выполнения этой задачи:

from pyspark.sql.functions import regexp_extract

# pattern should be very complex in order to effectively validate dates. this one is just for testing
ptn = r'\d+ [-/|] \d+ [-/|] \d+'

df1 = df.select([ regexp_extract(c, ptn, 0).alias(c) for c in cols ] ).replace('', None)
+--------------+-------------+------------+--------------+
|            c1|           c2|          c3|            c4|
+--------------+-------------+------------+--------------+
|11 - 12 - 1993| 4 | 4 | 2014|8 - 7 - 2013|          null|
| 12 / 6 / 1965| 8 - 6 - 2013|        null|11 - 12 - 1993|
| 10 / 5 / 2001|7 - 11 - 2011|4 | 5 | 2015| 10 / 5 / 2001|
+--------------+-------------+------------+--------------+

, тогда это становится таким же, как указано выше, для поиска и исключения столбцов, содержащихнулевые значения.

Примечание:

вы можете сделать это за один проход без использования df.summary():

from pyspark.sql.functions import regexp_extract, sum, when 

df1 = df.select([ sum(when(regexp_extract(c, ptn, 0)== '',1).otherwise(0)).alias(c) for c in cols ] )
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  0|  0|  1|  1|
+---+---+---+---+

cols_keep = [ c for c,v in df1.first().asDict().items() if not v ]

аналогично предыдущему методу безиспользуя регулярное выражение.

...