Пытались применить маринованную модель для прогнозирования потоковых данных.Первоначально модель была почти 1 ГБ и считала, что сокращение может решить эту проблему.Использовал другой протокол и сжатие для выделения объекта и уменьшил его до 60 МБ.
Поток входных данных представляет собой запись json, и к 3 ключам применяется прогноз.
Создание объекта Pickle:
Ранее:
joblib.dump(pipeline, 'itemc_nb.pkl')
Текущий:
joblib.dump(pipeline, 'itemc_nb.pkl',compress=1,protocol=-1)
Другая проверенная мной теория - это потребление памяти на граничном узле, где выполняется потоковый скрипт.На полной мощности он работает на 70%, как видно здесь
Пограничный узел имеет емкость 22 ГБ.
Другая мысль заключается в том, сколько раз модель может получатьвызывается, а не мусор.Как это может быть разрешено для получения только один раз?
model = joblib.load(os.path.join(__location__, 'itemc_nb.pkl'))
Вызов функции для оценки входной строки, как показано ниже.Здесь могут быть неэффективности, которые также могут быть причиной этого.
def predict_result(text):
ret_val = ''
try:
if text is not None and (type(text) == str or type(text) == unicode):
text = text.strip()
text = text.lower()
text = ''.join([i for i in text if not i.isdigit()])
text = ' '.join(text.split())
text = ' '.join([word for word in text.split() if word not in (stopwords.words('english'))])
text = text.split(' ', 0)
if re.match(r"^([a-z]|[0-9])\b", text[0]): #single letter removal
return 'non-relevant'
elif text[0] in ('n/a','na','.','nada','no','xx',''): #cleaning list
return 'non-relevant'
elif not text[0]:
return 'non-relevant'
else:
prediction = model.predict(text)
cat_name = cat_dict.get(prediction[0], 'No key found')
ret_val = cat_name
except (AttributeError, KeyError) as e:
ret_val = 'error'
return ret_val
На данный момент ищем некоторые мнения.
Exception encountered while processing data:
An error occurred while calling o394689.insertInto.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 366.0 failed 1 times, most recent failure: Lost task 0.0 in stage 366.0 (TID 366, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 442, in loads
return pickle.loads(obj)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 700, in subimport
__import__(name)
File "/tmp/spark-9e6c86f3-4d80-4bef-833e-e5a225d2824f/userFiles-1784ee88-ee98-467d-9abd-f017cccecf49/streaming_models.zip/itemc/itemc_tagger.py", line 14, in <module>
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 578, in load
obj = _unpickle(fobj, filename, mmap_mode)
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 508, in _unpickle
obj = unpickler.load()
File "/opt/rh/python27/root/usr/lib64/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 341, in load_build
self.stack.append(array_wrapper.read(self))
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 184, in read
array = self.read_array(unpickler)
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 130, in read_array
array = unpickler.np.empty(count, dtype=self.dtype)
MemoryError: (MemoryError(), <function subimport at 0x7f1d4f353050>, ('itemc.itemc_tagger',))
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:405)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:170)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:150)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:150)