Мне кажется, я знаю, что с этим происходит, но я относительно новичок в области искры в databricks и могу со вторым мнением.
У меня есть пара коротких фрагментов кода.Код состоит из 90000 записей, используемых для фильтрации набора данных объемом 6 ГБ.
Первый, использующий фильтр в списке, который потерпел неудачу с ошибкой отклоненного сокета.Второй, использующий объединение, работает нормально.
Я думаю, причина, по которой первый отказывает, состоит в том, что фильтр использует список, который собирается в драйвере, и, следовательно, для выполнения фильтра он должен собирать все данные вдрайвер, в котором точка сокета истекает время сериализации JVM для Python.Где вторым, использующим объединение, является СДР, и поэтому он распространяет объединение, которое гораздо более эффективно и успешно.Может кто-нибудь поделиться своими мыслями?
Вещи, которые я пробовал:
Я пытался распространить переменную списка, но это не удалось с включенным AttributeError _get_object_id
Стрелка - не имеет никакого эффектане удается с
Данные здесь: https://fasttext.cc/docs/en/english-vectors.html
ПРИМЕЧАНИЕ: стрелка включена, но не помогает - та же ошибка, что и без
from pyspark.sql.functions import
split
embeddings_index={}
f = sqlContext.read.format('csv').options(inferSchema='true').load("/mnt/ex_data/wiki.en.vec")
for i in range(301): f = f.withColumn('val_' + str(i), split(f['_c0'], ' ').getItem(i))
f = f.drop('_c0')
f = f.na.drop(subset=["val_7"])
wl = sc.broadcast(list(
f.select('val_0').limit(90000).toPandas()['val_0']
))
f_fil = f.filter((f.val_0.isin(wl)))
em_index = f_fil.toPandas().set_index('val_0').T.to_dict('list')
Exception: could not open socket: ["tried to connect to ('127.0.0.1', 36543), but an error occured: [Errno 111] Connection refused"]
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<command-537020040227877> in <module>()
18 f_fil = f.filter((f.val_0.isin(wl)))
19 #f_fil.explain()
---> 20 em_index = f_fil.toPandas().set_index('val_0').T.to_dict('list')
21 #wlb.unpersist()
22 #wlb.destroy()
/databricks/spark/python/pyspark/sql/dataframe.py in toPandas(self)
2137 _check_dataframe_localize_timestamps
2138 import pyarrow
-> 2139 batches = self._collectAsArrow()
2140 if len(batches) > 0:
2141 table = pyarrow.Table.from_batches(batches)
/databricks/spark/python/pyspark/sql/dataframe.py in _collectAsArrow(self)
2197 with SCCallSiteSync(self._sc) as css:
2198 sock_info = self._jdf.collectAsArrowToPython()
-> 2199 return list(_load_from_socket(sock_info, ArrowStreamSerializer()))
2200
2201 ##########################################################################################
/databricks/spark/python/pyspark/rdd.py in _load_from_socket(sock_info, serializer)
142
143 def _load_from_socket(sock_info, serializer):
--> 144 (sockfile, sock) = local_connect_and_auth(*sock_info)
145 # The RDD materialization time is unpredicable, if we set a timeout for socket reading
146 # operation, it will very possibly fail. See SPARK-18281.
/databricks/spark/python/pyspark/java_gateway.py in local_connect_and_auth(port, auth_secret)
176 sock = None
177 else:
--> 178 raise Exception("could not open socket: %s" % errors)
179
180
Объединениеоднако работает нормально - список был преобразован обратно в СДР:
from pyspark.sql.functions import split
embeddings_index={}
f = sqlContext.read.format('csv').options(inferSchema='true').load("/mnt/ex_data/wiki.en.vec")
for i in range(301): f = f.withColumn('val_' + str(i), split(f['_c0'], ' ').getItem(i))
f = f.drop('_c0')
f = f.na.drop(subset=["val_7"])
wl = list(
f.select('val_0').limit(90000).toPandas()['val_0']
)
from pyspark.sql.types import StringType
from pyspark.sql.functions import col
wordsdf = spark.createDataFrame(wl, StringType())
wordsdf = wordsdf.select(col("value").alias("val_0"))
keep_words = f.join(wordsdf, ['val_0'])
em_index = keep_words.toPandas().set_index('val_0').T.to_dict('list')