Не могу использовать keras model.predict в pyspark - PullRequest
0 голосов
/ 04 сентября 2018
def make_set(user_embed, country):
    entry_ids, feats = nation_feat[country][0], nation_feat[country][1]
    cut = entry_ids.shape[0]
    feats_news_user = np.concatenate(( np.tile(user_embed,(cut,1)) , feats[:cut] ), axis=1)
    pred_scores = dnn_model.predict(feats_news_user, verbose=0, steps=1)
    result = zip(entry_ids, list(pred_scores)) # 73.8 µs
    result_sort = sorted(result, key=lambda l:l[1], reverse=True) # 2.8ms 
    memcache_result = [(i[0], i[1][0]) for i in result_sort[:500]]
    #sta = handler.put(userid, memcache_result, pre_fix=kv_prefix, time=3*3600*24)   
    return memcache_result
kv_prefix = 'pyramid_dnn__'
m = df.map(lambda x: json.loads(x))\
  .map(lambda x: pad_none(x))\
  .map(lambda x: Row(user_embed =  user_embedding(x), country= x['user_country'], userid = x['user_id']) )\
  .map(lambda x: Row(result=make_set(x.user_embed, x.country),userid= x.userid) )

У меня есть модель keras с именем dnn_model здесь. Я хочу использовать это для прогнозирования для каждой строки. Но тут появляется ошибка:

/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
    653
    654         self.memoize(obj)
--> 655         self._batch_setitems(obj.iteritems())
    656
    657     dispatch[DictionaryType] = save_dict

/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
    685                 for k, v in tmp:
    686                     save(k)
--> 687                     save(v)
    688                 write(SETITEMS)
    689             elif n:

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    329
    330         # Save the reduce() output and finally memoize the object
--> 331         self.save_reduce(obj=obj, *rv)
    332
    333     def persistent_id(self, obj):

/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_reduce(self, func, args, state, listitems, dictitems, obj)
    802
    803         if state is not None:
--> 804             save(state)
    805             write(pickle.BUILD)
    806

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
    653
    654         self.memoize(obj)
--> 655         self._batch_setitems(obj.iteritems())
    656
    657     dispatch[DictionaryType] = save_dict

/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
    685                 for k, v in tmp:
    686                     save(k)
--> 687                     save(v)
    688                 write(SETITEMS)
    689             elif n:

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    329
    330         # Save the reduce() output and finally memoize the object
--> 331         self.save_reduce(obj=obj, *rv)
    332
    333     def persistent_id(self, obj):

/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_reduce(self, func, args, state, listitems, dictitems, obj)
    802
    803         if state is not None:
--> 804             save(state)
    805             write(pickle.BUILD)
    806

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
    653
    654         self.memoize(obj)
--> 655         self._batch_setitems(obj.iteritems())
    656
    657     dispatch[DictionaryType] = save_dict

/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
    685                 for k, v in tmp:
    686                     save(k)
--> 687                     save(v)
    688                 write(SETITEMS)
    689             elif n:

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/pickle.pyc in save_list(self, obj)
    604
    605         self.memoize(obj)
--> 606         self._batch_appends(iter(obj))
    607
    608     dispatch[ListType] = save_list

/root/anaconda2/lib/python2.7/pickle.pyc in _batch_appends(self, items)
    637                 write(MARK)
    638                 for x in tmp:
--> 639                     save(x)
    640                 write(APPENDS)
    641             elif n:

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    566         write(MARK)
    567         for element in obj:
--> 568             save(element)
    569
    570         if id(obj) in memo:

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
    653
    654         self.memoize(obj)
--> 655         self._batch_setitems(obj.iteritems())
    656
    657     dispatch[DictionaryType] = save_dict

/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
    685                 for k, v in tmp:
    686                     save(k)
--> 687                     save(v)
    688                 write(SETITEMS)
    689             elif n:

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
    653
    654         self.memoize(obj)
--> 655         self._batch_setitems(obj.iteritems())
    656
    657     dispatch[DictionaryType] = save_dict

/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
    685                 for k, v in tmp:
    686                     save(k)
--> 687                     save(v)
    688                 write(SETITEMS)
    689             elif n:

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_instancemethod(self, obj)
    653             else:
    654                 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),
--> 655                          obj=obj)
    656     dispatch[types.MethodType] = save_instancemethod
    657

/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_reduce(self, func, args, state, listitems, dictitems, obj)
    784         else:
    785             save(func)
--> 786             save(args)
    787             write(pickle.REDUCE)
    788

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    552         if n <= 3 and proto >= 2:
    553             for element in obj:
--> 554                 save(element)
    555             # Subtle.  Same as in the big comment below.
    556             if id(obj) in memo:

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    329
    330         # Save the reduce() output and finally memoize the object
--> 331         self.save_reduce(obj=obj, *rv)
    332
    333     def persistent_id(self, obj):

/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_reduce(self, func, args, state, listitems, dictitems, obj)
    802
    803         if state is not None:
--> 804             save(state)
    805             write(pickle.BUILD)
    806

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
    653
    654         self.memoize(obj)
--> 655         self._batch_setitems(obj.iteritems())
    656
    657     dispatch[DictionaryType] = save_dict

/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
    685                 for k, v in tmp:
    686                     save(k)
--> 687                     save(v)
    688                 write(SETITEMS)
    689             elif n:

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/pickle.pyc in save_dict(self, obj)
    653
    654         self.memoize(obj)
--> 655         self._batch_setitems(obj.iteritems())
    656
    657     dispatch[DictionaryType] = save_dict

/root/anaconda2/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
    685                 for k, v in tmp:
    686                     save(k)
--> 687                     save(v)
    688                 write(SETITEMS)
    689             elif n:

/root/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288

/root/anaconda2/lib/python2.7/site-packages/pyspark/cloudpickle.pyc in save_file(self, obj)
    827             return self.save_reduce(getattr, (sys,'stderr'), obj=obj)
    828         if obj is sys.stdin:
--> 829             raise pickle.PicklingError("Cannot pickle standard input")
    830         if obj.closed:
    831             raise pickle.PicklingError("Cannot pickle closed files")

PicklingError: Cannot pickle standard input

In [30]:

In [30]: BC_MODEL= sc.broadcast(dnn_model)
---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
<ipython-input-30-d4af10347d06> in <module>()
----> 1 BC_MODEL= sc.broadcast(dnn_model)

/root/anaconda2/lib/python2.7/site-packages/pyspark/context.pyc in broadcast(self, value)
    812         be sent to each cluster only once.
    813         """
--> 814         return Broadcast(self, value, self._pickled_broadcast_vars)
    815
    816     def accumulator(self, value, accum_param=None):

/root/anaconda2/lib/python2.7/site-packages/pyspark/broadcast.pyc in __init__(self, sc, value, pickle_registry, path)
     72         if sc is not None:
     73             f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)
---> 74             self._path = self.dump(value, f)
     75             self._jbroadcast = sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path)
     76             self._pickle_registry = pickle_registry

/root/anaconda2/lib/python2.7/site-packages/pyspark/broadcast.pyc in dump(self, value, f)
     81     def dump(self, value, f):
     82         try:
---> 83             pickle.dump(value, f, 2)
     84         except pickle.PickleError:
     85             raise

PicklingError: Can't pickle <type 'module'>: attribute lookup __builtin__.module failed

Как это решить? Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...