Spark Python List Filter - Исключение, отклоненное сокетом - PullRequest
0 голосов
/ 26 ноября 2018

Мне кажется, я знаю, что с этим происходит, но я относительно новичок в области искры в databricks и могу со вторым мнением.

У меня есть пара коротких фрагментов кода.Код состоит из 90000 записей, используемых для фильтрации набора данных объемом 6 ГБ.

Первый, использующий фильтр в списке, который потерпел неудачу с ошибкой отклоненного сокета.Второй, использующий объединение, работает нормально.

Я думаю, причина, по которой первый отказывает, состоит в том, что фильтр использует список, который собирается в драйвере, и, следовательно, для выполнения фильтра он должен собирать все данные вдрайвер, в котором точка сокета истекает время сериализации JVM для Python.Где вторым, использующим объединение, является СДР, и поэтому он распространяет объединение, которое гораздо более эффективно и успешно.Может кто-нибудь поделиться своими мыслями?

Вещи, которые я пробовал:

Я пытался распространить переменную списка, но это не удалось с включенным AttributeError _get_object_id

Стрелка - не имеет никакого эффектане удается с

Данные здесь: https://fasttext.cc/docs/en/english-vectors.html

ПРИМЕЧАНИЕ: стрелка включена, но не помогает - та же ошибка, что и без

from pyspark.sql.functions import 
split
embeddings_index={} 
f = sqlContext.read.format('csv').options(inferSchema='true').load("/mnt/ex_data/wiki.en.vec") 
for i in range(301): f = f.withColumn('val_' + str(i), split(f['_c0'], ' ').getItem(i)) 
f = f.drop('_c0') 
f = f.na.drop(subset=["val_7"]) 
wl = sc.broadcast(list( 
    f.select('val_0').limit(90000).toPandas()['val_0'] 
)) 
f_fil = f.filter((f.val_0.isin(wl))) 
em_index = f_fil.toPandas().set_index('val_0').T.to_dict('list')
Exception: could not open socket: ["tried to connect to ('127.0.0.1', 36543), but an error occured: [Errno 111] Connection refused"]
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<command-537020040227877> in <module>()
    18 f_fil = f.filter((f.val_0.isin(wl)))
    19 #f_fil.explain()
---> 20 em_index = f_fil.toPandas().set_index('val_0').T.to_dict('list')
    21 #wlb.unpersist()
    22 #wlb.destroy()

/databricks/spark/python/pyspark/sql/dataframe.py in toPandas(self)
2137                         _check_dataframe_localize_timestamps
2138                     import pyarrow
-> 2139                     batches = self._collectAsArrow()
2140                     if len(batches) > 0:
2141                         table = pyarrow.Table.from_batches(batches)
/databricks/spark/python/pyspark/sql/dataframe.py in _collectAsArrow(self)
2197         with SCCallSiteSync(self._sc) as css:
2198             sock_info = self._jdf.collectAsArrowToPython()
-> 2199         return list(_load_from_socket(sock_info, ArrowStreamSerializer()))
2200 
2201     ##########################################################################################

/databricks/spark/python/pyspark/rdd.py in _load_from_socket(sock_info, serializer)
    142 
    143 def _load_from_socket(sock_info, serializer):
--> 144     (sockfile, sock) = local_connect_and_auth(*sock_info)
    145     # The RDD materialization time is unpredicable, if we set a timeout for socket reading
    146     # operation, it will very possibly fail. See SPARK-18281.

/databricks/spark/python/pyspark/java_gateway.py in local_connect_and_auth(port, auth_secret)
    176             sock = None
    177     else:
--> 178         raise Exception("could not open socket: %s" % errors)
    179 
    180 

Объединениеоднако работает нормально - список был преобразован обратно в СДР:

from pyspark.sql.functions import split
embeddings_index={}
f = sqlContext.read.format('csv').options(inferSchema='true').load("/mnt/ex_data/wiki.en.vec")
for i in range(301): f = f.withColumn('val_' + str(i), split(f['_c0'], ' ').getItem(i))
f = f.drop('_c0')
f = f.na.drop(subset=["val_7"])
wl = list(
    f.select('val_0').limit(90000).toPandas()['val_0']
)

from pyspark.sql.types import StringType
from pyspark.sql.functions import col
wordsdf = spark.createDataFrame(wl, StringType())
wordsdf = wordsdf.select(col("value").alias("val_0"))
keep_words = f.join(wordsdf, ['val_0'])
em_index = keep_words.toPandas().set_index('val_0').T.to_dict('list')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...