Извлечение всех совпадений из разных столбцов pyspark в зависимости от некоторых условий - PullRequest
4 голосов
/ 29 октября 2019

Мне нужно извлечь некоторые коды из столбцов кадра данных, который выглядит следующим образом:

+---------+--------------------------------+--------------------+------+
|first    |second                          |third               |num   |
+---------+--------------------------------+--------------------+------+
|AB12a    |xxxxxx                          |some other data     |100000|
|yyyyyyy  |XYZ02, but possibly also GFH11b |Look at second col* |120000|
+---------+--------------------------------+--------------------+------+

Коды следуют регулярному выражению "^([A-Z]+[0-9]+[a-z]*)" и разбросаны по двум столбцам (first и * 1006). *) в зависимости от того, содержит ли звездочка third. Поскольку в каждом столбце может быть несколько кодов, мне нужны все совпадения регулярных выражений в массиве. В приведенном выше примере мне нужно извлечь AB12a из first и [XYZ02, GFH11b] из second.

Я обнаружил, что множественные совпадения не поддерживаются функцией pyspark по умолчанию regexp_extract (https://issues.apache.org/jira/browse/SPARK-24884), поэтому я определил свой собственный regexp_extract_all UDF:

from pyspark.sql.types import *
from pyspark.sql.functions import *
import re

def regexp_extract_all(s, pattern):
    pattern = re.compile(pattern, re.M)
    all_matches = re.findall(pattern, s)
    return all_matches

pattern = "^([A-Z]+[0-9]+[a-z]*)"

udf_regexp_extract_all = udf(regexp_extract_all, ArrayType(StringType()))

Мне удалось заставить работать UDF, если применить его к каждому столбцу отдельно:

# this extracts AB12a from first
df = df.withColumn("code", udf_regexp_extract_all("first", lit(pattern)))

# this extracts [XYZ02, GFH11b] from second
df = df.withColumn("code", udf_regexp_extract_all("second", lit(pattern)))

Но я получаю TypeError: expected string or buffer при работе в предложении when:

# this gives at runtime TypeError: expected string or buffer
df = df.withColumn("code", when(col("third").like("%*%"), 
                           udf_regexp_extract_all("second", lit(pattern)))
                           .otherwise(udf_regexp_extract_all("first", lit(pattern))))

Я думаю, что, вероятно, меня заваливают типами во время выполнения, потому что что-то происходит в предложении when, для которого нужно определить мой UDFнемного по-другому.

Есть идеи?

Ответы [ 3 ]

2 голосов
/ 29 октября 2019

Ваш код работает отлично. Просто измените условие когда, как показано ниже

df.withColumn("code1", when(df.third.rlike("\*") == True, 
                           udf_regexp_extract_all("second", lit("([A-Z]+[0-9]+[a-z]*)")))
                           .otherwise(udf_regexp_extract_all("first", lit("([A-Z]+[0-9]+[a-z]*)")))).show(10, False)

+-------+-------------------------------+-------------------+------+-------+
|first  |second                         |third              |num   |code1  |
+-------+-------------------------------+-------------------+------+-------+
|AB12a  |xxxxxx                         |some other data    |100000|[AB12a]|
|yyyyyyy|XYZ02, but possibly also GFH11b|Look at second col*|120000|[XYZ02, GHF11b]|
+-------+-------------------------------+-------------------+------+-------+
1 голос
/ 30 октября 2019

Ваш код слишком многословен и может быть упрощен некоторыми корректировками для лучшей читабельности:

Метод 1: переместить логику в udf

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import re

pattern = re.compile(r'\b([A-Z]+[0-9]+[a-z]*)\b')

# use s2 if s3 contains '*', otherwise use s1 (including s3 is NULL)
def regexp_extract_all(s1, s2, s3, pattern):
  try:
    return re.findall(pattern, s2 if '*' in (s3 or '') else s1)
  except:
    return []

udf_regexp_extract_all = udf(lambda x,y,z: regexp_extract_all(x, y, z, pattern), ArrayType(StringType()))

df.withColumn("code", udf_regexp_extract_all('first', 'second', 'third')).show()
#+-------+--------------------+-------------------+------+---------------+
#|  first|              second|              third|   num|           code|
#+-------+--------------------+-------------------+------+---------------+
#|  AB12a|              xxxxxx|    some other data|100000|        [AB12a]|
#|yyyyyyy|XYZ02, but possib...|Look at second col*|120000|[XYZ02, GFH11b]|
#|   null|                 111|                222|  1233|             []|
#+-------+--------------------+-------------------+------+---------------+

Метод-2: переместить when () в аргумент функции udf

from pyspark.sql.functions import udf, when, col

def regexp_extract_all(x, pattern):
    return re.findall(pattern, x or '')

udf_regexp_extract_all = udf(lambda x: regexp_extract_all(x, pattern), ArrayType(StringType()))

df.withColumn('code', udf_regexp_extract_all(when(col('third').like('%*%'), col('second')).otherwise(col('first')))) \
  .show()

Или с помощью выражения Spark SQL:

from pyspark.sql.functions import udf, expr

df.withColumn('code', udf_regexp_extract_all(expr("IF(third like '%*%', second, first)"))).show()
0 голосов
/ 29 октября 2019

В этом случае TypeError происходит, потому что в столбцах есть нулевые значения. Я не включил их в MWE, но это может (и имеет место!) Происходить в реальных данных.

Простой способ решить эту проблему - это дополнительное условие в регулярном выражении UDF, обеспечивающее только re.findallработает на строковых объектах:

def regexp_extract_all(s, pattern):
    s = "" if s is None else s
    pattern = re.compile(pattern, re.M)
    all_matches = re.findall(pattern, s)
    return all_matches
...