mxnet: как настроить пользовательский mx.io.DataIter для предварительной выборки? - PullRequest
0 голосов
/ 08 декабря 2018

Мой сценарий mxnet, вероятно, ограничен вводом-выводом данных, загружаемых в графический процессор, и я пытаюсь ускорить это путем предварительной выборки.Проблема в том, что я не могу понять, как выполнить предварительную выборку с помощью пользовательского итератора данных.

Моя первая гипотеза / надежда заключалась в том, что этого будет достаточно, чтобы установить значения self.preprocess_threads и self.prefetch_buffer, как я видел здесь для итераторов, таких как mxnet.io.ImageRecordUInt8Iter.Однако, когда я сделал это, я не увидел никаких изменений производительности по сравнению со сценарием до того, как установил эти переменные, поэтому их четкая настройка не сработала.

Затем я заметил существование класса mx.io.PrefetchingIter в дополнение к базовому классу, для которого я реализовал дочерний класс mx.io.DataIter.Я нашел эту документацию , но я не смог найти ни одного примера, и меня немного смущает, что и где должно произойти.Тем не менее, я не ясно, как это использовать.Например.Я вижу, что в дополнение к next() у него есть метод iter_next(), который просто говорит "перейти к следующей партии".Что это значит точно?Что значит «перейти» к следующей партии, не производя ее?Я нашел исходный код для этого класса, и, основываясь на кратком чтении, кажется, что он принимает несколько итераторов и создает один поток на каждый итератор.Это, скорее всего, не сработает для моего текущего проекта, так как я действительно хочу, чтобы несколько потоков использовались для предварительной выборки с одного итератора.

Вот что я пытаюсь сделать с помощью пользовательского итератора данных

  1. Я поддерживаю глобальный multiprocessing.Queue, для которого я извлекаю данные, когда они становятся доступными
  2. Я создаю эти данные, запуская (через multiprocessing) сценарий командной строки, который выполняет двоичный файл c ++, который создает numpy file
  3. Я открываю файл numpy и загружаю его содержимое в память, обрабатываю их и помещаю обработанные биты в глобальный multiprocessing.Queue
  4. Мой пользовательский итератор вытягивает эту очередь итакже запускает больше заданий для получения большего количества данных, когда очередь пуста.

Вот мой код:

def launchJobForDate(date_str):
### this is a function that gets called via multiprocessing
### to produce new data by calling a c++ binary
### whenever data queue is empty so that we need to produce more data
    try:
        f = "testdata/data%s.npy"%date_str
        if not os.path.isfile(f):
            cmd = CMD % ( date_str, JSON_FILE, date_str, date_str, date_str)
            while True:
                try:
                    output = subprocess.check_output(cmd, shell=True)
                    break
                except:
                    pass
        while True:
            try:
                d = np.load(f)
                break
            except:
                pass
        data_queue.put((d, date_str))
    except Exception as ex:
        print("launchJobForDate: ERROR ", ex)

class ProduceDataIter(mx.io.DataIter):
    @staticmethod
    def processData(d, time_steps, num_inputs):
       try: 
            ...processes data...
            return [z for z in zip(bigX, bigY, bigEvalY, dates)]
        except Exception as ex:
            print("processData: ERROR ", ex)

    def __init__(self, num_mgrs, end_date_str):
        ## iter stuff
        self.preprocess_threads = 4
        self.prefetch_buffer = 1

        ## set up internal data to preserve state
        ## and make a list of dates for which to run binary

    @property
    def provide_data(self):
        return [mx.io.DataDesc(name='seq_var', 
                               shape=(args_batch_size * GPU_COUNT, 
                                      self.time_steps, 
                                      self.num_inputs), 
                               layout='NTC')]

    @property
    def provide_label(self):
        return [mx.io.DataDesc(name='bd_return', 
                                shape=(args_batch_size * GPU_COUNT)),             
                mx.io.DataDesc(name='bd_return', 
                                shape=(args_batch_size * GPU_COUNT, num_y_cols)), 
                mx.io.DataDesc(name='date', 
                               shape=(args_batch_size * GPU_COUNT))]                 


    def __next__(self):
        try:
            z = self.z.pop(0)       
            data = z[0:1]
            label = z[1:]
            return mx.io.DataBatch(data, label) 
        except Exception as ex:
            ### if self.z (a list) has no elements to pop we need
            ### to get more data off the queue, process it, and put it
            ### on self.x so it's ready for calls to __next__()
            while True:
                try:
                    d = data_queue.get_nowait()
                    processedData = ProduceDataIter.processData(d, 
                                                            self.time_steps, 
                                                            self.num_inputs)
                    self.z.extend(processedData)
                    counter_queue.put(counter_queue.get() - 1)

                    z = self.z.pop(0)
                    data = z[0:1]
                    label = z[1:]
                    return mx.io.DataBatch(data, label)

                except queue.Empty:
                    ...this is where new jobs to produce new data and put them 
                    ...on the queue would happen if nothing is left on the queue

Затем я попытался создать один из этих итераторов, а такжеитератор предварительной выборки, например, так:

mgr      = ProcessMgr(2, end_date_str)
mgrOuter = mx.io.PrefetchingIter([mgr])

Проблема в том, что mgrOuter немедленно выбрасывает StopIteration, как только __next__() вызывается в первый раз, и без вызова mgr.__next__(), как я думалможет.

Наконец, я также заметил, что gluon имеет объект DataLoader, который выглядит так, как будто он может обрабатывать предварительную выборку , однако в этом случае также предполагается, что базовые данные взяты из Datasetкоторый имеет конечную и неизменную компоновку (исходя из того факта, что он реализован в терминах getitem, который принимает индекс).Поэтому я не использовал этот вариант, поскольку он кажется бесперспективным, учитывая динамический характер данных, которые я генерирую в качестве входных данных для обучения в виде очереди.

Мои вопросы:

  • Как мне нужно изменить мой код выше, чтобы обеспечить предварительную выборку для моего пользовательского итератора?
  • Где я могу найти пример или более подробную документацию о том, как работает mx.io.PrefetchingIter?
  • Существуют ли другие стратегии, о которых мне следует знать, чтобы повысить производительность своих графических процессоров с помощью специального итератора?Сейчас они работают только на 50% мощности, и увеличение (или уменьшение) размера партии не меняет этого.Какие еще ручки можно включить, чтобы повысить эффективность использования графического процессора?

Спасибо за любые отзывы и советы.

1 Ответ

0 голосов
/ 03 января 2019

Как вы уже упоминали, глюон DataLoader обеспечивает предварительную выборку.В вашем пользовательском DataIterator вы используете массивы Numpy в качестве входных данных.Таким образом, вы можете сделать следующее:

f = "testdata/data%s.npy"%date_str
data = np.load(f)
train = gluon.data.ArrayDataset(mx.nd.array(data))
train_iter = gluon.data.DataLoader(train, shuffle=True, num_workers=4, batch_size=batch_size, last_batch='rollover')

Поскольку вы динамически создаете свои данные, вы можете попробовать сбросить DataLoader в каждую эпоху и загрузить новый массив Numpy.Если использование графического процессора все еще низкое, попробуйте увеличить batch_size и num_workers.Еще одной проблемой может быть также размер вашего набора данных.Сброс DataLoader повлияет на производительность, поэтому больший набор данных увеличит время эпохи и, следовательно, увеличит производительность.

...