Ну, в конце концов, не нужно было использовать виртуальный env, но не мог избежать распространения полной копии python (содержащей необходимые зависимости) на все узлы.
Сначала собрал полную копию python(действительно использовал conda env, но вы могли бы использовать другие способы):
conda create --prefix /home/me/env_conda_for_pyarrow
source activate /home/me/env_conda_for_pyarrow
conda install numpy
conda install pyarrow
в этом конкретном случае перед установкой пришлось открыть канал conda-forge, чтобы получить последние версии.
Во-вторых, распакуйте zip-дистрибутив:
zip -r env_conda_for_pyarrow.zip env_conda_for_pyarrow
Затем используйте архивы, чтобы распространить zip и env var PYSPARK_PYTHON, чтобы указать на него:
import os, sys
os.environ['PYSPARK_PYTHON']="dist_python/env_conda_for_pyarrow/bin/python"
import pyspark
spark = \
pyspark.sql.SparkSession.builder.appName("pysaprk_python")\
.config("spark.yarn.dist.archives", "env_conda_for_pyarrow.zip#dist_python")\
.getOrCreate()
print spark.version, spark.sparkContext.master
Вот и все, готово,Вот несколько сценариев, которые я использовал для тестирования:
def list_nodes_dir(x): # hack to see workers file dirs
import os
return os.listdir('dist_python')
spark.sparkContext.parallelize(range(1), 1).map(list_nodes_dir).collect()
def npv(x): # hack to see workers numpy version
import numpy as np
return np.__version__
set(spark.sparkContext.parallelize(range(10), 10).map(npv).collect())
# spark documents example
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType, StringType
slen = pandas_udf(lambda s: s.str.len(), IntegerType())
@pandas_udf(StringType())
def to_upper(s):
return s.str.upper()
@pandas_udf("integer", PandasUDFType.SCALAR)
def add_one(x):
return x + 1
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"),
add_one("age")).show()