Использование регулярного выражения в pyspark для замены, чтобы заменить строку даже внутри массива? - PullRequest
0 голосов
/ 05 апреля 2020

Существует такой синтаксис: df.withColumn ('new', regexp_replace ('old', 'str', ''))

это для замены строки в столбце.

Мой вопрос: что, если у меня есть столбец, состоящий из массивов и строки? Значение строки может иметь либо строку, либо массив, содержащий эту строку. Есть ли способ заменить эту строку независимо от того, находится она одна или внутри массива?

Ответы [ 2 ]

1 голос
/ 06 апреля 2020

Наличие столбца с несколькими типами в настоящее время не поддерживается. Тем не менее, столбец содержит массив строк, вы можете взорвать массив (https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=explode#pyspark. sql .functions.explode ), который создает строку для каждого элемента в массиве, и применять обычные выражение в новый столбец. Пример:

from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

sql_context = SQLContext(spark.sparkContext)

df = sql_context.createDataFrame([("hello world",),
                                  ("hello madam",),
                                  ("hello sir",),
                                  ("hello everybody",),
                                  ("goodbye world",)], schema=['test'])

df = df.withColumn('test', F.array(F.col('test')))

print(df.show())

df = df.withColumn('test-exploded', F.explode(F.col('test')))

df = df.withColumn('test-exploded-regex', F.regexp_replace(F.col('test-exploded'), "hello", "goodbye"))


print(df.show())

Вывод:

+-----------------+
|             test|
+-----------------+
|    [hello world]|
|    [hello madam]|
|      [hello sir]|
|[hello everybody]|
|  [goodbye world]|
+-----------------+

+-----------------+---------------+-------------------+
|             test|  test-exploded|test-exploded-regex|
+-----------------+---------------+-------------------+
|    [hello world]|    hello world|      goodbye world|
|    [hello madam]|    hello madam|      goodbye madam|
|      [hello sir]|      hello sir|        goodbye sir|
|[hello everybody]|hello everybody|  goodbye everybody|
|  [goodbye world]|  goodbye world|      goodbye world|
+-----------------+---------------+-------------------+

А если вы хотите поместить результаты обратно в массив:

df = df.withColumn('test-exploded-regex-array', F.array(F.col('test-exploded-regex')))

Вывод:

+-----------------+---------------+-------------------+-------------------------+
|             test|  test-exploded|test-exploded-regex|test-exploded-regex-array|
+-----------------+---------------+-------------------+-------------------------+
|    [hello world]|    hello world|      goodbye world|          [goodbye world]|
|    [hello madam]|    hello madam|      goodbye madam|          [goodbye madam]|
|      [hello sir]|      hello sir|        goodbye sir|            [goodbye sir]|
|[hello everybody]|hello everybody|  goodbye everybody|      [goodbye everybody]|
|  [goodbye world]|  goodbye world|      goodbye world|          [goodbye world]|
+-----------------+---------------+-------------------+-------------------------+

Надеюсь, это поможет!

Обновление

Обновлен для включения случая, когда столбец массива имеет несколько строк:

from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

sql_context = SQLContext(spark.sparkContext)

df = sql_context.createDataFrame([("hello world", "foo"),
                                  ("hello madam", "bar"),
                                  ("hello sir", "baz"),
                                  ("hello everybody", "boo"),
                                  ("goodbye world", "bah")], schema=['test', 'test2'])

df = df.withColumn('test', F.array(F.col('test'), F.col('test2'))).drop('test2')

df = df.withColumn('id', F.monotonically_increasing_id())

print(df.show())

df = df.withColumn('test-exploded', F.explode(F.col('test')))

df = df.withColumn('test-exploded-regex', F.regexp_replace(F.col('test-exploded'), "hello", "goodbye"))

df = df.groupBy('id').agg(F.collect_list(F.col('test-exploded-regex')).alias('test-exploded-regex-array'))


print(df.show())

Вывод:

+--------------------+-----------+
|                test|         id|
+--------------------+-----------+
|  [hello world, foo]|          0|
|  [hello madam, bar]| 8589934592|
|    [hello sir, baz]|17179869184|
|[hello everybody,...|25769803776|
|[goodbye world, bah]|25769803777|
+--------------------+-----------+

+-----------+-------------------------+
|         id|test-exploded-regex-array|
+-----------+-------------------------+
| 8589934592|     [goodbye madam, bar]|
|          0|     [goodbye world, foo]|
|25769803776|     [goodbye everybod...|
|25769803777|     [goodbye world, bah]|
|17179869184|       [goodbye sir, baz]|
+-----------+-------------------------+

Просто опустите столбец id, когда закончите обработку!

0 голосов
/ 05 апреля 2020

Я думаю, что это невозможно в фрейме данных в искре, поскольку фрейм данных не позволяет иметь несколько типов для столбца. Это приведет к ошибке при создании кадра данных.

Хотя вы можете сделать это с помощью RDD.

scala> val seq = Seq((1,"abc"),(2,List("abcd")))
seq: Seq[(Int, java.io.Serializable)] = List((1,abc), (2,List(abcd)))

scala> val rdd1 = sc.parallelize(seq)
rdd1: org.apache.spark.rdd.RDD[(Int, java.io.Serializable)] = ParallelCollectionRDD[2] at parallelize at <console>:26

scala> rdd1.take(2)
res1: Array[(Int, java.io.Serializable)] = Array((1,abc), (2,List(abcd)))

scala> val rdd2 = rdd1.map(x => x._2 match {
     | case v: String => (x._1, v.replaceAll("abc","def"))
     | case p: List[String] => (x._1, p.map(s => s.replaceAll("abc","def")))
     | }
     | )
rdd2: org.apache.spark.rdd.RDD[(Int, java.io.Serializable)] = MapPartitionsRDD[3] at map at <console>:25

scala> rdd2.take(2)
res2: Array[(Int, java.io.Serializable)] = Array((1,def), (2,List(defd)))

Дайте мне знать, если это поможет !!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...