Реализация набора данных и загрузчика данных с бесконечным циклом в PyTorch - PullRequest
0 голосов
/ 25 января 2019

Я хотел бы реализовать бесконечный цикл Dataset & DataLoader.Вот что я попробовал:

class Infinite(Dataset):
    def __len__(self):
        return HPARAMS.batch_size
#         return 1<<30 # This causes huge memory usage.
    def __getitem__(self, idx):
        """Randomly generates one new example."""
        return sample_func_to_be_parallelized()

infinite_loader = DataLoader(
    dataset=Infinite(), 
    batch_size=HPARAMS.batch_size, 
    num_workers=16,
    worker_init_fn=lambda worker_id: np.random.seed(worker_id),  
)

while True:
    for idx, data in enumerate(infinite_loader):
        # forward + backward on "data"

Как видите, основной проблемой здесь является __len()__ метод.Если я помещу туда достаточно большое число, например, 1 << 30, то симптомом будет то, что использование памяти при первой итерации цикла поезда увеличится до 10 + ГБ.Через некоторое время рабочих убивают, предположительно, из-за OOM. </p>

Если я добавлю туда небольшое число, например 1 или BATCH_SIZE, выборочные «данные» в цикле поезда будут периодически дублироваться.Это не то, чего я хочу, так как я хотел бы, чтобы новые данные генерировались и обрабатывались на каждой итерации.

Я предполагаю, что виновник чрезмерного использования памяти находится где-то в стеке, куча вещейкэшируютсяПри случайном взгляде на Python я не могу точно определить, где.

Может кто-нибудь посоветовать, как лучше всего реализовать то, что я хочу реализовать?(Используйте параллельную загрузку DataLoader, одновременно гарантируя, что каждый загруженный пакет является совершенно новым.)

Ответы [ 3 ]

0 голосов
/ 24 февраля 2019

Попробуйте использовать cycle из itertools.Вот пример для простого набора данных:

Код:

from itertools import cycle

import torch
from torch.utils.data import Dataset, DataLoader


# Create some dummy data.
data = torch.tensor([[0, 0],
                     [1, 1],
                     [2, 2],
                     [3, 3]])


class DataSet(Dataset):
    """Our dataset. Iterates over tensor data"""

    def __init__(self, data):
        self.data = data
        self.n = self.data.shape[0]

    def __len__(self):
        return self.n

    def __getitem__(self, idx):
        return self.data[idx]


bs = 1  # batch size
workers = 1  # number of workers

dataset = DataSet(data)
data_loader = DataLoader(dataset, batch_size=bs, shuffle=False, num_workers=workers)

# Infinite loop.
print(f'batch size: {bs} | number of workers: {workers}')
for i, data in cycle(enumerate(data_loader)):
    print(i, data)

Вывод:

batch size: 1 | number of workers: 1
0 tensor([[0, 0]])
1 tensor([[1, 1]])
2 tensor([[2, 2]])
3 tensor([[3, 3]])
0 tensor([[0, 0]])
1 tensor([[1, 1]])
2 tensor([[2, 2]])
3 tensor([[3, 3]])
...

batch size: 2 | number of workers: 2
0 tensor([[0, 0],
        [1, 1]])
1 tensor([[2, 2],
        [3, 3]])
0 tensor([[0, 0],
        [1, 1]])
1 tensor([[2, 2],
...
0 голосов
/ 27 февраля 2019

Это работает без периодического дублирования данных:

import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

BATCH_SIZE = 2

class Infinite(Dataset):

    def __len__(self):
        return BATCH_SIZE

    def __getitem__(self, idx):
        return torch.randint(0, 10, (3,))


data_loader = DataLoader(Infinite(), batch_size=BATCH_SIZE, num_workers=16)

batch_count = 0
while True:
    batch_count += 1
    print(f'Batch {batch_count}:')

    data = next(iter(data_loader))
    print(data)
    # forward + backward on "data"  

    if batch_count == 5:
        break

Результат:

Batch 1:
tensor([[4, 7, 7],
        [0, 8, 0]])
Batch 2:
tensor([[6, 8, 6],
        [2, 6, 7]])
Batch 3:
tensor([[6, 6, 2],
        [8, 7, 0]])
Batch 4:
tensor([[9, 4, 8],
        [2, 4, 1]])
Batch 5:
tensor([[9, 6, 1],
        [2, 7, 5]])

Так что я думаю, что проблема в вашей функции sample_func_to_be_parallelized().


Редактировать : Если вместо torch.randint(0, 10, (3,)) я использую np.random.randint(10, size=3) в __getitem__ (как пример sample_func_to_be_parallelized()), то данные действительно дублируются в каждом пакете. Смотрите этот выпуск .

Так что, если вы используете RGN numpy где-то в вашем sample_func_to_be_parallelized(), то обходной путь должен использовать

worker_init_fn=lambda worker_id: np.random.seed(np.random.get_state()[1][0] + worker_id) 

и для сброса семян на np.random.seed() перед каждым вызовом data = next(iter(data_loader)).

0 голосов
/ 25 января 2019

DataLoader образцы вашего набора данных без замены . Для этого он генерирует случайную перестановку индексов от 0 до len(dataset). Я предполагаю, что эта перестановка ответственна за поглощение большей части вашей памяти. Я не думаю, что API PyTorch поддерживают бесконечные коллекции, но вы можете попробовать разветвить код в DataLoader и сделать это самостоятельно. Вы можете использовать параметр batch_sampler и передать пользовательский вариант, реализованный на основе RandomSampler. Это позволит вам сохранить часть параллельной загрузки DataLoader.

При этом протокол итерации, основанный на __len__ и __getitem__, просто не подходит для бесконечных коллекций. Вам может быть лучше переопределить ваш Dataset.__len__, чтобы просто вернуть 1, ваш Dataset.__getitem__, чтобы всегда возвращать новый образец, независимо от индекса, а затем производить выборку n раз с заменой из этого набор данных. Технически, он будет запрашивать n раз для 0-го семпла, но так как вы переопределяете __getitem__ для возврата разных семплов, это будет эффективно делать то, что вы ищете.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...