Я просто не понимаю этого.
Локально я могу запустить трубопровод без проблем. Однако при запуске конвейера в кластере кажется, что я не могу использовать код из моего локального проекта:
ImportError: No module named 'themodule'
,[Ljava.lang.StackTraceElement;@282acd6d,org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/grid/3/hadoop/yarn/local/usercache/sfalk/appcache/application_1520347847754_0722/container_e48_1520347847754_0722_01_000006/pyspark.zip/pyspark/worker.py", line 159, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
File "/grid/3/hadoop/yarn/local/usercache/sfalk/appcache/application_1520347847754_0722/container_e48_1520347847754_0722_01_000006/pyspark.zip/pyspark/worker.py", line 91, in read_udfs
_, udf = read_single_udf(pickleSer, infile)
File "/grid/3/hadoop/yarn/local/usercache/sfalk/appcache/application_1520347847754_0722/container_e48_1520347847754_0722_01_000006/pyspark.zip/pyspark/worker.py", line 78, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/grid/3/hadoop/yarn/local/usercache/sfalk/appcache/application_1520347847754_0722/container_e48_1520347847754_0722_01_000006/pyspark.zip/pyspark/worker.py", line 54, in read_command
command = serializer._read_with_length(file)
File "/grid/3/hadoop/yarn/local/usercache/sfalk/appcache/application_1520347847754_0722/container_e48_1520347847754_0722_01_000006/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/grid/3/hadoop/yarn/local/usercache/sfalk/appcache/application_1520347847754_0722/container_e48_1520347847754_0722_01_000006/pyspark.zip/pyspark/serializers.py", line 419, in loads
return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'themodule'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Итак .. themodule
является корнем моего проекта, а themodule/otherstuff
содержит методы и функции, которые я получил.
themodule/cluster/pipeline/run_luigi.py
themodule/otherstuff/
там, где начинается мой трубопровод. Выглядит примерно так:
import logging
import luigi
from themodule.cluster.pipeline.task import PreprocessRawData
class MainTask(luigi.Task):
def requires(self):
return PreprocessRawData()
def run(self):
logging.info('All tasks done.')
return
def output(self):
return self.requires().output()
if __name__ == '__main__':
luigi.run()
print('All done.')
Локально -> нет проблем. По кластеру -> см. Выше.
И хуже всего то, что я даже не вижу, где происходит сбой импорта. Во всяком случае, это не должно подвести.
Я знаю, что это не очень хорошо, но это небольшой скрипт, который я выполняю, чтобы запустить конвейер в данный момент:
#!/bin/bash
PYTHONPATH=.:$PYTHONPATH PYTHONHASHSEED=0 SPARK_YARN_USER_ENV="PYTHONHASHSEED=0" PYSPARK_PYTHON=/usr/local/bin/python3.5 SPARK_MAJOR_VERSION=2 LUIGI_CONFIG_PATH=/home/sfalk/workspaces/theproject/python/luigi-cluster.cfg /usr/local/bin/python3.5 themodule/cluster/pipeline/run_luigi.py --local-scheduler MainTask
Давайте не будем забывать, что это работает локально.
Возможно, это что-то простое, но я не вижу, чего мне не хватает.