Я пытаюсь создать сервер извлечения объектов, используя M XNET в python. Сервер должен быть многопоточным и разрешать несколько подключений / вычислений одновременно.
Сначала я создал класс для экстрактора следующим образом:
class ExtractorModel:
def __init__(self, image_size='112,112', model='./', gpu=0):
if gpu:
ctx = mx.gpu(0)
else:
ctx = mx.cpu(0)
_vec = image_size.split(',')
assert len(_vec) == 2
image_size = (int(_vec[0]), int(_vec[1]))
self.model = None
timer = time.time()
if len(model) > 0:
self.model = self.get_model(ctx, image_size, model, 'fc1')
self.image_size = image_size
def get_model(self, ctx, image_size, model_str, layer):
_vec = model_str.split(',')
assert len(_vec) == 2
prefix = _vec[0]
epoch = int(_vec[1])
sym, arg_params, aux_params = mx.model.load_checkpoint(prefix, epoch)
all_layers = sym.get_internals()
sym = all_layers[layer + '_output']
inputs = mx.sym.var('data')
model = mx.mod.Module(symbol=sym, context=ctx, label_names=None)
model.bind(data_shapes=[('data', (1, 3, image_size[0], image_size[1]))])
model.set_params(arg_params, aux_params)
return model
def extract_features(self, image):
if image is not None:
image= cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = np.transpose(image, (2, 0, 1))
input_blob = np.expand_dims(image, axis=0)
data = mx.nd.array(input_blob)
db = mx.io.DataBatch(data=(data,))
self.model.forward(db, is_train=False)
features = self.model.get_outputs()[0].asnumpy()
features = sklearn.preprocessing.normalize(features).flatten()
return features
.
.
.
в основном я определил сокет и сервер, создал объект ExtractorModel и начал прослушивать входящее соединение, и всякий раз, когда соединение установлено, я запускаю новый поток для извлечения функции:
основной код:
.
.
.
extract_model = ExtractorModel(image_size='112,112', model=model_path, gpu=1)
while True:
server.listen(5)
client_socket, _ = server.accept()
new_thread = ExtractorServer(client_socket, extract_model)
new_thread.start()
Сервер экстрактора является Класс для обработки потока и расчета функций:
class ExtractorServer(threading.Thread):
def __init__(self, client_socket, ext_model):
threading.Thread.__init__(self)
self.conn_socket = client_socket
self.ext_model = ext_model
print("................... Started New Connection ...................")
def run(self):
image = cv2.imread("test.jpg") #image is a 112x112 image
if image is not None:
features = self.ext_model.extract_features(image)
.
.
.
Теперь приходит проблема. когда с клиентом происходит одно соединение, код запускается так, как должно. но когда у меня несколько соединений, происходят забавные вещи! Случайным образом код либо выполняет переадресацию net для нескольких соединений, но занимает больше времени для переадресации, как если бы они выполнялись последовательно, а не параллельно; или код аварийно завершает работу, вызывая следующую ошибку:
raise MXNetError(py_str(_LIB.MXGetLastError()))
mxnet.base.MXNetError: [16:48:33] /home/travis/build/dmlc/mxnet-distro/mxnet-build/3rdparty/tvm/nnvm/include/nnvm/graph.h:263: Check failed: it != attrs.end(): Cannot find attribute shape in the graph
Неправильно ли выполнено выполнение потоков? Является ли M XNET Thread безопасным для выполнения нескольких пересылок на одной и той же модели для разных входных данных? Если M XNET может обрабатывать несколько форвардов, как я могу исправить случайный случай с cra sh? (Я не уверен, что это гонка данных или нет)