Мой сценарий mxnet, вероятно, ограничен вводом-выводом данных, загружаемых в графический процессор, и я пытаюсь ускорить это путем предварительной выборки.Проблема в том, что я не могу понять, как выполнить предварительную выборку с помощью пользовательского итератора данных.
Моя первая гипотеза / надежда заключалась в том, что этого будет достаточно, чтобы установить значения self.preprocess_threads и self.prefetch_buffer, как я видел здесь для итераторов, таких как mxnet.io.ImageRecordUInt8Iter
.Однако, когда я сделал это, я не увидел никаких изменений производительности по сравнению со сценарием до того, как установил эти переменные, поэтому их четкая настройка не сработала.
Затем я заметил существование класса mx.io.PrefetchingIter
в дополнение к базовому классу, для которого я реализовал дочерний класс mx.io.DataIter
.Я нашел эту документацию , но я не смог найти ни одного примера, и меня немного смущает, что и где должно произойти.Тем не менее, я не ясно, как это использовать.Например.Я вижу, что в дополнение к next()
у него есть метод iter_next()
, который просто говорит "перейти к следующей партии".Что это значит точно?Что значит «перейти» к следующей партии, не производя ее?Я нашел исходный код для этого класса, и, основываясь на кратком чтении, кажется, что он принимает несколько итераторов и создает один поток на каждый итератор.Это, скорее всего, не сработает для моего текущего проекта, так как я действительно хочу, чтобы несколько потоков использовались для предварительной выборки с одного итератора.
Вот что я пытаюсь сделать с помощью пользовательского итератора данных
- Я поддерживаю глобальный
multiprocessing.Queue
, для которого я извлекаю данные, когда они становятся доступными - Я создаю эти данные, запуская (через
multiprocessing
) сценарий командной строки, который выполняет двоичный файл c ++, который создает numpy
file - Я открываю файл
numpy
и загружаю его содержимое в память, обрабатываю их и помещаю обработанные биты в глобальный multiprocessing.Queue
- Мой пользовательский итератор вытягивает эту очередь итакже запускает больше заданий для получения большего количества данных, когда очередь пуста.
Вот мой код:
def launchJobForDate(date_str):
### this is a function that gets called via multiprocessing
### to produce new data by calling a c++ binary
### whenever data queue is empty so that we need to produce more data
try:
f = "testdata/data%s.npy"%date_str
if not os.path.isfile(f):
cmd = CMD % ( date_str, JSON_FILE, date_str, date_str, date_str)
while True:
try:
output = subprocess.check_output(cmd, shell=True)
break
except:
pass
while True:
try:
d = np.load(f)
break
except:
pass
data_queue.put((d, date_str))
except Exception as ex:
print("launchJobForDate: ERROR ", ex)
class ProduceDataIter(mx.io.DataIter):
@staticmethod
def processData(d, time_steps, num_inputs):
try:
...processes data...
return [z for z in zip(bigX, bigY, bigEvalY, dates)]
except Exception as ex:
print("processData: ERROR ", ex)
def __init__(self, num_mgrs, end_date_str):
## iter stuff
self.preprocess_threads = 4
self.prefetch_buffer = 1
## set up internal data to preserve state
## and make a list of dates for which to run binary
@property
def provide_data(self):
return [mx.io.DataDesc(name='seq_var',
shape=(args_batch_size * GPU_COUNT,
self.time_steps,
self.num_inputs),
layout='NTC')]
@property
def provide_label(self):
return [mx.io.DataDesc(name='bd_return',
shape=(args_batch_size * GPU_COUNT)),
mx.io.DataDesc(name='bd_return',
shape=(args_batch_size * GPU_COUNT, num_y_cols)),
mx.io.DataDesc(name='date',
shape=(args_batch_size * GPU_COUNT))]
def __next__(self):
try:
z = self.z.pop(0)
data = z[0:1]
label = z[1:]
return mx.io.DataBatch(data, label)
except Exception as ex:
### if self.z (a list) has no elements to pop we need
### to get more data off the queue, process it, and put it
### on self.x so it's ready for calls to __next__()
while True:
try:
d = data_queue.get_nowait()
processedData = ProduceDataIter.processData(d,
self.time_steps,
self.num_inputs)
self.z.extend(processedData)
counter_queue.put(counter_queue.get() - 1)
z = self.z.pop(0)
data = z[0:1]
label = z[1:]
return mx.io.DataBatch(data, label)
except queue.Empty:
...this is where new jobs to produce new data and put them
...on the queue would happen if nothing is left on the queue
Затем я попытался создать один из этих итераторов, а такжеитератор предварительной выборки, например, так:
mgr = ProcessMgr(2, end_date_str)
mgrOuter = mx.io.PrefetchingIter([mgr])
Проблема в том, что mgrOuter
немедленно выбрасывает StopIteration
, как только __next__()
вызывается в первый раз, и без вызова mgr.__next__()
, как я думалможет.
Наконец, я также заметил, что gluon
имеет объект DataLoader
, который выглядит так, как будто он может обрабатывать предварительную выборку , однако в этом случае также предполагается, что базовые данные взяты из Dataset
который имеет конечную и неизменную компоновку (исходя из того факта, что он реализован в терминах getitem
, который принимает индекс).Поэтому я не использовал этот вариант, поскольку он кажется бесперспективным, учитывая динамический характер данных, которые я генерирую в качестве входных данных для обучения в виде очереди.
Мои вопросы:
- Как мне нужно изменить мой код выше, чтобы обеспечить предварительную выборку для моего пользовательского итератора?
- Где я могу найти пример или более подробную документацию о том, как работает mx.io.PrefetchingIter?
- Существуют ли другие стратегии, о которых мне следует знать, чтобы повысить производительность своих графических процессоров с помощью специального итератора?Сейчас они работают только на 50% мощности, и увеличение (или уменьшение) размера партии не меняет этого.Какие еще ручки можно включить, чтобы повысить эффективность использования графического процессора?
Спасибо за любые отзывы и советы.