def make_set(user_embed, country):
entry_ids, feats = nation_feat[country][0], nation_feat[country][1]
cut = entry_ids.shape[0]
feats_news_user = np.concatenate(( np.tile(user_embed,(cut,1)) , feats[:cut] ), axis=1)
pred_scores = dnn_model.predict(feats_news_user, verbose=0, steps=1)
result = zip(entry_ids, list(pred_scores)) # 73.8 µs
result_sort = sorted(result, key=lambda l:l[1], reverse=True) # 2.8ms
memcache_result = [(i[0], i[1][0]) for i in result_sort[:500]]
#sta = handler.put(userid, memcache_result, pre_fix=kv_prefix, time=3*3600*24)
return memcache_result
kv_prefix = 'pyramid_dnn__'
m = df.map(lambda x: json.loads(x))\
.map(lambda x: pad_none(x))\
.map(lambda x: Row(user_embed = user_embedding(x), country= x['user_country'], userid = x['user_id']) )\
.map(lambda x: Row(result=make_set(x.user_embed, x.country),userid= x.userid) )
У меня есть модель keras с именем dnn_model
здесь. Я хочу использовать это для прогнозирования для каждой строки.
Но тут появляется ошибка:
/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
685 for k, v in tmp:
686 save(k)
--> 687 save(v)
688 write(SETITEMS)
689 elif n:
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
329
330 # Save the reduce() output and finally memoize the object
--> 331 self.save_reduce(obj=obj, *rv)
332
333 def persistent_id(self, obj):
/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_reduce(self, func, args, state, listitems, dictitems, obj)
802
803 if state is not None:
--> 804 save(state)
805 write(pickle.BUILD)
806
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
685 for k, v in tmp:
686 save(k)
--> 687 save(v)
688 write(SETITEMS)
689 elif n:
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
329
330 # Save the reduce() output and finally memoize the object
--> 331 self.save_reduce(obj=obj, *rv)
332
333 def persistent_id(self, obj):
/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_reduce(self, func, args, state, listitems, dictitems, obj)
802
803 if state is not None:
--> 804 save(state)
805 write(pickle.BUILD)
806
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
685 for k, v in tmp:
686 save(k)
--> 687 save(v)
688 write(SETITEMS)
689 elif n:
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/pickle.pyc in save_list(self, obj)
604
605 self.memoize(obj)
--> 606 self._batch_appends(iter(obj))
607
608 dispatch[ListType] = save_list
/root/anaconda2/lib/python2.7/pickle.pyc in _batch_appends(self, items)
637 write(MARK)
638 for x in tmp:
--> 639 save(x)
640 write(APPENDS)
641 elif n:
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/pickle.pyc in save_tuple(self, obj)
566 write(MARK)
567 for element in obj:
--> 568 save(element)
569
570 if id(obj) in memo:
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
685 for k, v in tmp:
686 save(k)
--> 687 save(v)
688 write(SETITEMS)
689 elif n:
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
685 for k, v in tmp:
686 save(k)
--> 687 save(v)
688 write(SETITEMS)
689 elif n:
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_instancemethod(self, obj)
653 else:
654 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),
--> 655 obj=obj)
656 dispatch[types.MethodType] = save_instancemethod
657
/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_reduce(self, func, args, state, listitems, dictitems, obj)
784 else:
785 save(func)
--> 786 save(args)
787 write(pickle.REDUCE)
788
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/pickle.pyc in save_tuple(self, obj)
552 if n <= 3 and proto >= 2:
553 for element in obj:
--> 554 save(element)
555 # Subtle. Same as in the big comment below.
556 if id(obj) in memo:
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
329
330 # Save the reduce() output and finally memoize the object
--> 331 self.save_reduce(obj=obj, *rv)
332
333 def persistent_id(self, obj):
/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_reduce(self, func, args, state, listitems, dictitems, obj)
802
803 if state is not None:
--> 804 save(state)
805 write(pickle.BUILD)
806
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
685 for k, v in tmp:
686 save(k)
--> 687 save(v)
688 write(SETITEMS)
689 elif n:
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
685 for k, v in tmp:
686 save(k)
--> 687 save(v)
688 write(SETITEMS)
689 elif n:
/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_file(self, obj)
827 return self.save_reduce(getattr, (sys,'stderr'), obj=obj)
828 if obj is sys.stdin:
--> 829 raise pickle.PicklingError("Cannot pickle standard input")
830 if obj.closed:
831 raise pickle.PicklingError("Cannot pickle closed files")
PicklingError: Cannot pickle standard input
In [30]:
In [30]: BC_MODEL= sc.broadcast(dnn_model)
---------------------------------------------------------------------------
PicklingError Traceback (most recent call last)
<ipython-input-30-d4af10347d06> in <module>()
----> 1 BC_MODEL= sc.broadcast(dnn_model)
/root/anaconda2/lib/python2.7/site-packages/pyspark/context.pyc in broadcast(self, value)
812 be sent to each cluster only once.
813 """
--> 814 return Broadcast(self, value, self._pickled_broadcast_vars)
815
816 def accumulator(self, value, accum_param=None):
/root/anaconda2/lib/python2.7/site-packages/pyspark/broadcast.pyc in __init__(self, sc, value, pickle_registry, path)
72 if sc is not None:
73 f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)
---> 74 self._path = self.dump(value, f)
75 self._jbroadcast = sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path)
76 self._pickle_registry = pickle_registry
/root/anaconda2/lib/python2.7/site-packages/pyspark/broadcast.pyc in dump(self, value, f)
81 def dump(self, value, f):
82 try:
---> 83 pickle.dump(value, f, 2)
84 except pickle.PickleError:
85 raise
PicklingError: Can't pickle <type 'module'>: attribute lookup __builtin__.module failed
Как это решить? Спасибо.