Есть ли способ заставить работников Spark использовать версию с распределенной версией вместо установленной на них? - PullRequest
0 голосов
/ 16 декабря 2018

Ситуация выглядит следующим образом: работая на корпоративном кластере с поддержкой версии 2.3, я хочу запустить pandas_udf, для которого требуется pyarrow, для которого требуется значение numpy 0,14 (AFAIK).Был в состоянии распространять pyarrow (я думаю, нет способа проверить эти 100%):

 pyspark.sql.SparkSession.builder.appName("pandas_udf_poc").config("spark.executor.instances","2")\
                                              .config("spark.executor.memory","8g")\
                                              .config("spark.driver.memory","8g")\
                                              .config("spark.driver.maxResultSize","8g")\
                                              .config("py-files", "pyarrow_depnd.zip")\
                                              .getOrCreate()  

spark.sparkContext.addPyFile("pyarrow_depnd.zip")

zip - это результат установки pip в dir и архивирования.

Но pyarrow делаетне играть вместе с узлами 0,13, я думаю, я мог бы попытаться распределить полный env по всем узлам, но мой вопрос, есть ли способ избежать этого и заставить узел использовать разный Numpy (который уже распределен вzar пиарроу)

Спасибо

1 Ответ

0 голосов
/ 24 декабря 2018

Ну, в конце концов, не нужно было использовать виртуальный env, но не мог избежать распространения полной копии python (содержащей необходимые зависимости) на все узлы.

Сначала собрал полную копию python(действительно использовал conda env, но вы могли бы использовать другие способы):

conda create --prefix /home/me/env_conda_for_pyarrow
source activate /home/me/env_conda_for_pyarrow
conda install numpy 
conda install pyarrow

в этом конкретном случае перед установкой пришлось открыть канал conda-forge, чтобы получить последние версии.

Во-вторых, распакуйте zip-дистрибутив:

zip -r env_conda_for_pyarrow.zip env_conda_for_pyarrow

Затем используйте архивы, чтобы распространить zip и env var PYSPARK_PYTHON, чтобы указать на него:

import os, sys
os.environ['PYSPARK_PYTHON']="dist_python/env_conda_for_pyarrow/bin/python"

import pyspark
spark = \
pyspark.sql.SparkSession.builder.appName("pysaprk_python")\
.config("spark.yarn.dist.archives", "env_conda_for_pyarrow.zip#dist_python")\
.getOrCreate()

print spark.version, spark.sparkContext.master

Вот и все, готово,Вот несколько сценариев, которые я использовал для тестирования:

def list_nodes_dir(x): # hack to see workers file dirs
    import os
    return os.listdir('dist_python')

spark.sparkContext.parallelize(range(1), 1).map(list_nodes_dir).collect()    



def npv(x): # hack to see workers numpy version
    import numpy as np
    return np.__version__

set(spark.sparkContext.parallelize(range(10), 10).map(npv).collect())



# spark documents example
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType, StringType
slen = pandas_udf(lambda s: s.str.len(), IntegerType())  

@pandas_udf(StringType())  
def to_upper(s):
    return s.str.upper()

@pandas_udf("integer", PandasUDFType.SCALAR)  
def add_one(x):
    return x + 1

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))  
df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")).show() 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...