Итак, предполагая, что вы не можете собрать термины замены rdd, но также предполагая, что термины замены - это одно слово:
Сначала вам нужно расклеить текст (И вспомнить словопорядок).
Затем вы выполняете объединение влево, чтобы заменить слова.
Затем вы заново собираете исходный текст.
replacement_terms_rdd = sc.parallelize([("replace", "replacement1"),
("text", "replacement2"),
("is", "replacement3")])
text_rdd = sc.parallelize([(1, "here is some text to replace with terms"),
(2, "text to replace with terms "),
(3, "text"),
(4, "here is some text to replace"),
(5, "text to replace")])
print (text_rdd\
.flatMap(lambda x: [(y[1], (x[0], y[0])) for y in enumerate(x[1].split())] )\
.leftOuterJoin(replacement_terms_rdd)\
.map(lambda x: (x[1][0][0], (x[1][0][1], x[1][1] or x[0]) ))\
.groupByKey().mapValues(lambda x: " ".join([y[1] for y in sorted(x)]))\
.collect())
Результат:
[(1, 'here replacement3 some replacement2 to replacement1 with terms'), (2, 'replacement2 to replacement1 with terms'), (3, 'replacement2'), (4, 'here replacement3 some replacement2 to replacement1'), (5, 'replacement2 to replacement1')]