Как использовать многопроцессорную работу с тензорным потоком? - PullRequest
0 голосов
/ 20 ноября 2018

Я пытаюсь использовать тензор потока, служащий для размещения CNN для обнаружения объекта и сделать некоторые дальнейшие вычисления с предсказаниями cnn.Изображения хранятся на облачном сервере, и у меня есть большой список ссылок для скачивания и некоторая дополнительная мета-информация для каждого изображения.Поэтому я должен загрузить их, предсказать содержание и сделать некоторые дополнительные вычисления после этого.Чтобы избежать узких мест, я хотел бы использовать многопроцессорную библиотеку pythons, инициализируя фиксированное количество рабочих: одну для загрузки всех изображений, а другие для передачи каждого изображения на сервер, получения прогноза и выполнения дополнительных вычислений.Я уже попробовал некоторые подходы с «безопасными для процесса» переменными и «Locks», чтобы избежать Broken Pipe ошибок или ошибок, подобных этой:

ssl.SSLError: [SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC] decryption failed or bad record mac (_ssl.c:2281)

Но, похоже, ничего не работает.

Вот минимальный пример моего подхода:

import urllib
from multiprocessing import Process, Manager, Lock

def download_images(info_dict_lock, download_list, info_dict):
    #download images and save meta into info_dict

    for download_url, img_name, meta_info in download_list:
        urllib.urlretrieve(download_url, img_name)

        #lock info_dict and save meta_infos 
        info_dict_lock.acquire()
        info_dict[img_name] = meta_info
        info_dict_lock.release()


def predict_image(request_lock, img_name, meta_info):
    #send image to image server, recive result
    #and do some calculations with the result 

    request_lock.acquire()
    result = send_to_server(img_name)  #send image to a server
    request_lock.release()

    #do some calculations with result and meta_info


with Manager() as manager:
    # create locks
    request_lock = Lock()
    info_dict_lock = Lock()

    # create process safe info_dict
    info_dict = manager.dict()

    # start image downloader
    p1 = Process(target=download_images, args=(info_dict_lock, download_list, info_dict))
    p1.start()


    while info_dict or p1.is_alive():
        if info_dict:
            temp = info_dict
            for img_name, meta_info in temp.items(): 

                p2 = Process(target=predict_image, args=(request_lock, img_name, meta_info))
                p2.start()

                # lock info_dict and delete image form info_dict
                info_dict_lock.acquire()
                del info_dict[img_name]
                info_dict_lock.release()

                p2.join()

Переменная info_dict работает как глобальная переменная (я очень открыта для других решений), в которой хранятся все загруженные изображения, включая метаинформацию.Когда изображение передается работнику p2, соответствующая запись в info_dict удаляется, чтобы убедиться, что ни один другой работник не может подобрать такое же изображение.Я подумал, что мне просто нужно заблокировать переменную info_dict перед добавлением или удалением записи, чтобы избежать ошибки, которая произошла выше, но это не решило проблему.Также я выяснил, что запрос к серверу иногда не выполняется, если я инициализирую больше работников, поэтому я также «заблокировал» эту часть кода, чтобы разрешить только один запрос за раз (хотя тензор потока обслуживает заявки для обработки около 100 тыс. Запросов в секунду).).

Кажется, я что-то путаю.Есть ли правильный способ реализации многопроцессорной задачи с запросами к серверу?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...