Ошибка травления Pyspark при попытке загрузить numy массивы в s3 с помощью boto3 - PullRequest
0 голосов
/ 08 ноября 2018

Я пытаюсь загрузить свои массивы в s3 с помощью клиента boto3 в приложении pyspark, но при этом появляется сообщение об ошибке вывода. Ниже мой код.

def write_features3(model,key,obj,output_path, format_name):
    try:
        LOGGER.info('executing vgg16 feature extractor...')
        img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
        img_data = image.img_to_array(img)
        img_data = np.expand_dims(img_data, axis=0)
        img_data = preprocess_input(img_data)
        vgg16_feature = model.predict(img_data)[0]
        LOGGER.info('++++++++++++++++++++++++++++',vgg16_feature.shape)






        file_name_without_ext = get_file_name_without_ext(key)
        rest_of_path = OUTPUT.split('/', 1)[1]
        s3_full_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'
        LOGGER.info("Saving to S3....")
        feature_dir = '/home/hadoop'
        s3 = boto3.client('s3', region_name='us-east-1')
        local_dir_full_path = feature_dir + '/' + file_name_without_ext + '.npy'
        np.save(local_dir_full_path, vgg16_feature)
        s3.upload_file(local_dir_full_path, 'test', s3_full_path)
        os.remove(local_dir_full_path)
    except Exception as e:
        print('Error......{}'.format(e.args))
        return []

def write_features_(xs):
    model_data = initVGG16()

    for k, v in xs:
        yield k, write_features3(model_data, k,v,OUTPUT, FORMAT_NAME)

driver program:-
s3_files_rdd = sc.binaryFiles('s3n://....')
features_rdd = s3_files_rdd.foreachPartition(write_features_)

Когда я пытаюсь использовать эту программу, я получаю сообщение об ошибке ниже. Даже я пытался поместить s3-клиент в метод write_features_ partition, но безуспешно. та же ошибка. версия spark - 2.2.1

Ошибка: -

n save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
  File "six_file_boto3_write1.py", line 249, in <module>
    run()
  File "six_file_boto3_write1.py", line 227, in run
    s3_files_rdd.foreachPartition(write_features_)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects

1 Ответ

0 голосов
/ 25 ноября 2018

Проблема была с версией spark, я использовал spark-2.2.1.Теперь я обновился до spark-2.3.2, все заработало нормально.

...