Pyspark заменяет несколько строк в RDD - PullRequest
0 голосов
/ 22 декабря 2018

Я хотел бы заменить несколько строк в pyspark rdd.Я хотел бы заменить эти строки в порядке длины - от самой длинной к самой короткой.В конечном итоге операция заменит большой объем текста, поэтому следует учитывать хорошую производительность.

Пример проблемы:

В приведенном ниже примере я хотел бы заменитьстроки:

 replace, text, is

с, в соответствующем порядке (от самого длинного к самому короткому):

 replacement1, replacement2, replacement3

, т.е. если найдена строка replace , ее следует заменитьс replace1 и в этом примере первым будет произведен поиск и замена.

Строки также будут сохранены как pyspark rdd следующим образом:

+---------+------------------+
| string  | replacement_term |
+---------+------------------+
| replace | replacement1     |
+---------+------------------+
| text    | replacement2     |
+---------+------------------+
| is      | replacement3     |
+---------+------------------+

См. примерrdd, который необходимо заменить приведенными выше терминами:

+----+-----------------------------------------+
| id | text                                    |
+----+-----------------------------------------+
| 1  | here is some text to replace with terms |
+----+-----------------------------------------+
| 2  | text to replace with terms              |
+----+-----------------------------------------+
| 3  | text                                    |
+----+-----------------------------------------+
| 4  | here is some text to replace            |
+----+-----------------------------------------+
| 5  | text to replace                         |
+----+-----------------------------------------+

И я хотел бы заменить, создавая вывод rdd следующим образом:

+----+----------------------------------------------------------------+
| id | text                                                           |
+----+----------------------------------------------------------------+
| 1  | here replacement3 some replacement2 to replacement1 with terms |
+----+----------------------------------------------------------------+
| 2  | replacement2 to replacement1 with terms                        |
+----+----------------------------------------------------------------+
| 3  | replacement2                                                   |
+----+----------------------------------------------------------------+
| 4  | here replacement3 some replacement2 to replacement1            |
+----+----------------------------------------------------------------+
| 5  | replacement2 to replacement1                                   |
+----+----------------------------------------------------------------+

Спасибо за помощь.

Ответы [ 2 ]

0 голосов
/ 22 декабря 2018

Итак, предполагая, что вы не можете собрать термины замены rdd, но также предполагая, что термины замены - это одно слово:

Сначала вам нужно расклеить текст (И вспомнить словопорядок).

Затем вы выполняете объединение влево, чтобы заменить слова.

Затем вы заново собираете исходный текст.

replacement_terms_rdd = sc.parallelize([("replace", "replacement1"),
                                        ("text", "replacement2"),
                                        ("is", "replacement3")])

text_rdd = sc.parallelize([(1, "here is some text to replace with terms"),
                          (2, "text to replace with terms "),
                          (3, "text"),
                          (4, "here is some text to replace"),
                          (5, "text to replace")])

print (text_rdd\
.flatMap(lambda x: [(y[1], (x[0], y[0])) for y in enumerate(x[1].split())] )\
.leftOuterJoin(replacement_terms_rdd)\
.map(lambda x: (x[1][0][0], (x[1][0][1], x[1][1] or x[0]) ))\
.groupByKey().mapValues(lambda x: " ".join([y[1] for y in sorted(x)]))\
.collect())

Результат:

[(1, 'here replacement3 some replacement2 to replacement1 with terms'), (2, 'replacement2 to replacement1 with terms'), (3, 'replacement2'), (4, 'here replacement3 some replacement2 to replacement1'), (5, 'replacement2 to replacement1')]
0 голосов
/ 22 декабря 2018

Следующий фрагмент кода работает для Spark / Scala & DataFrame s API .Попробуйте адаптировать его к RDD & PySpark

// imports
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// spark-session (not needed if your'e in spark-shell)
implicit val spark: SparkSession = SparkSession.builder().appName("SO").getOrCreate()

// you'll be reading it from somewhere
val dfToBeModified: DataFrame = spark.createDataFrame(
  rowRDD = spark.sparkContext.parallelize(List(
    Row(1, "here is some text to replace with terms"),
    Row(2, "text to replace with terms"),
    Row(3, "text"),
    Row(4, "here is some text to replace"),
    Row(5, "text to replace")
  )),
  schema = StructType(List(
    StructField("id", IntegerType, false),
    StructField("text", StringType, false)
  ))
)

// it should preferably be read not as a dataframe but as a sequence  
val dfWithReplacements: DataFrame = spark.createDataFrame(
    rowRDD = spark.sparkContext.parallelize(List(
    Row("replace", "replacement1"),
    Row("text", "replacement2"),
    Row("is", "replacement3")
  )),
  schema = StructType(List(
    StructField("string", StringType, false),
    StructField("replacement_term", StringType, false)
  ))
)

// dfWithReplacements must not be too big or your executor will crash
val seqWithReplacements: Array[Row] = dfWithReplacements.collect()

// there you go
val dfWithModifications: DataFrame = seqWithReplacements.foldLeft(dfToBeModified) { (dfWithSomeModifications: DataFrame, row: Row) =>
    dfWithSomeModifications.withColumn("text", regexp_replace(dfWithSomeModifications("text"), row(0).toString, row(1).toString))
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...