Я пытаюсь использовать тензор потока, служащий для размещения CNN для обнаружения объекта и сделать некоторые дальнейшие вычисления с предсказаниями cnn.Изображения хранятся на облачном сервере, и у меня есть большой список ссылок для скачивания и некоторая дополнительная мета-информация для каждого изображения.Поэтому я должен загрузить их, предсказать содержание и сделать некоторые дополнительные вычисления после этого.Чтобы избежать узких мест, я хотел бы использовать многопроцессорную библиотеку pythons, инициализируя фиксированное количество рабочих: одну для загрузки всех изображений, а другие для передачи каждого изображения на сервер, получения прогноза и выполнения дополнительных вычислений.Я уже попробовал некоторые подходы с «безопасными для процесса» переменными и «Locks», чтобы избежать Broken Pipe
ошибок или ошибок, подобных этой:
ssl.SSLError: [SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC] decryption failed or bad record mac (_ssl.c:2281)
Но, похоже, ничего не работает.
Вот минимальный пример моего подхода:
import urllib
from multiprocessing import Process, Manager, Lock
def download_images(info_dict_lock, download_list, info_dict):
#download images and save meta into info_dict
for download_url, img_name, meta_info in download_list:
urllib.urlretrieve(download_url, img_name)
#lock info_dict and save meta_infos
info_dict_lock.acquire()
info_dict[img_name] = meta_info
info_dict_lock.release()
def predict_image(request_lock, img_name, meta_info):
#send image to image server, recive result
#and do some calculations with the result
request_lock.acquire()
result = send_to_server(img_name) #send image to a server
request_lock.release()
#do some calculations with result and meta_info
with Manager() as manager:
# create locks
request_lock = Lock()
info_dict_lock = Lock()
# create process safe info_dict
info_dict = manager.dict()
# start image downloader
p1 = Process(target=download_images, args=(info_dict_lock, download_list, info_dict))
p1.start()
while info_dict or p1.is_alive():
if info_dict:
temp = info_dict
for img_name, meta_info in temp.items():
p2 = Process(target=predict_image, args=(request_lock, img_name, meta_info))
p2.start()
# lock info_dict and delete image form info_dict
info_dict_lock.acquire()
del info_dict[img_name]
info_dict_lock.release()
p2.join()
Переменная info_dict
работает как глобальная переменная (я очень открыта для других решений), в которой хранятся все загруженные изображения, включая метаинформацию.Когда изображение передается работнику p2, соответствующая запись в info_dict
удаляется, чтобы убедиться, что ни один другой работник не может подобрать такое же изображение.Я подумал, что мне просто нужно заблокировать переменную info_dict
перед добавлением или удалением записи, чтобы избежать ошибки, которая произошла выше, но это не решило проблему.Также я выяснил, что запрос к серверу иногда не выполняется, если я инициализирую больше работников, поэтому я также «заблокировал» эту часть кода, чтобы разрешить только один запрос за раз (хотя тензор потока обслуживает заявки для обработки около 100 тыс. Запросов в секунду).).
Кажется, я что-то путаю.Есть ли правильный способ реализации многопроцессорной задачи с запросами к серверу?