Контекст
В pySpark Я передаю переменную всем узлам со следующим кодом:
sc = spark.sparkContext # Get context
# Extract stopwords from a file in hdfs
# The result looks like stopwords = {"and", "fu", "bar" ... }
stopwords = set([line[0] for line in csv.reader(open(SparkFiles.get("stopwords.txt"), 'r'))])
# The set of stopwords is broadcasted now
stopwords = sc.broadcast(stopwords)
После трансляции stopwords
Я хочу сделать его доступным в mapPartitions
:
# Some dummy-dataframe
df = spark.createDataFrame([(["TESTA and TESTB"], ), (["TESTB and TESTA"], )], ["text"])
# The method which will be applied to mapPartitions
def stopwordRemoval(partition, passed_broadcast):
"""
Removes stopwords from "text"-column.
@partition: iterator-object of partition.
@passed_stopwords: Lookup-table for stopwords.
"""
# Now the broadcast is passed
passed_stopwords = passed_broadcast.value
for row in partition:
yield [" ".join((word for word in row["text"].split(" ") if word not in passed_stopwords))]
# re-partitioning in order to get mapPartitions working
df = df.repartition(2)
# Now apply the method
df = df.select("text").rdd \
.mapPartitions(lambda partition: stopwordRemoval(partition, stopwords)) \
.toDF()
# Result
df.show()
#Result:
+------------+
| text |
+------------+
|TESTA TESTB |
|TESTB TESTA |
+------------+
Вопросы
Даже при том, что это работает Я не совсем уверен, что это правильное использование переменных вещания. Итак, мои вопросы:
- Правильно ли выполняется трансляция, когда я передаю ее
mapParitions
демонстрационным способом? - Полезно ли использовать трансляцию в
mapParitions
, так как стоп-слова будут так или иначе, распространяется с функцией по всем узлам (стоп-слова никогда не используются повторно)?
Второй вопрос касается этого вопроса , который частично отвечает на мой вопрос. Так или иначе, в пределах специфики это отличается; вот почему я решил также задать этот вопрос.