Мы генерируем ~ 10 тыс. Массивов с использованием keras, а затем, наконец, мы должны сохранить эти массивы в виде файлов .npy на s3. Но проблема в том, что для сохранения в s3 внутри функции карты spark нам нужно создать промежуточный файл. Вместо того, чтобы создавать промежуточные файлы, мы хотим напрямую направить их в s3. Я использовал эту библиотеку "Cottoncandy", но тогда она не работает внутри функции карты искры и выдает ошибку как: -
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
Есть ли какой-нибудь возможный способ / библиотека, которую мы можем использовать в приложении для глубокого обучения внутри функции карты искр, чтобы напрямую передавать массивы numpy в s3?
У меня есть массив rdd of numpy:
features_rdd
вариантов, которые я пробовал: -
def writePartition(xs):
cci = cc.get_interface('BUCKET_NAME', ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
#output_path, format_name
for k,v in xs:
file_name_with_domain = get_file_with_parents(k, 1)
file_name = ...
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = OUTPUT.split('/', 1)[0]
rest_of_path = OUTPUT.split('/', 1)[1]
final_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'
LOGGER.info("Saving to S3....")
response = cci.upload_npy_array(final_path, v)
features_rdd.foreachpartition(writePartition)
вариант 2: -
def writePartition1(xs):
s3 = boto3.client('s3',region_name='us-east-1')
for k,v in xs:
...
...
np.save(local_dir_full_path, v)
s3.upload_file(local_dir_full_path, 'BUCKET', s3_full_path)
os.remove(local_dir_full_path)
features_rdd.foreachpartition(writePartition1)
Ошибка: -
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "six_file_boto3_write1.py", line 248, in <module>
run()
File "six_file_boto3_write1.py", line 239, in run
features_rdd.foreachPartition(writePartitionWithBoto)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
импорт: -
from pyspark.sql import SparkSession
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
import numpy as np
import logging
import os
import boto3
import cottoncandy as cc
Итак, в основном приложение работает отлично, пока features_rdd. Даже я могу проверить количество. Но когда я пытаюсь сохранить эти функции, эта часть не работает. Добавлен импорт выше
Обновления: -
def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......{}'.format(e.args))
return []
def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)
spark = SparkSession \
.builder \
.appName('test-app') \
.getOrCreate()
sc = spark.sparkContext
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()
features_rdd = s3_files_rdd.mapPartitions(extract_features_)