Как работает параллельный метод и распределенный метод pytorch? - PullRequest
0 голосов
/ 19 ноября 2018

Я не эксперт в распределенных системах и CUDA.Но есть одна действительно интересная особенность, которую поддерживает PyTorch: nn.DataParallel и nn.DistributedDataParallel.Как они на самом деле реализованы?Как они разделяют общие вложения и синхронизируют данные?

Вот базовый пример DataParallel.

import torch.nn as nn
from torch.autograd.variable import Variable
import numpy as np

class Model(nn.Module):
    def __init__(self):
        super().__init__(
            embedding=nn.Embedding(1000, 10),
            rnn=nn.Linear(10, 10),
        )

    def forward(self, x):
        x = self.embedding(x)
        x = self.rnn(x)
        return x

model = nn.DataParallel(Model())
model.forward(Variable.from_numpy(np.array([1,2,3,4,5,6], dtype=np.int64)).cuda()).cpu()

PyTorch может разделить входные данные и отправить их во многие графические процессоры и объединить результаты обратно.

Как он управляет вложениями и синхронизацией для параллельной модели или распределенной модели?
Я бродил по коду PyTorch, но очень трудно понять, как работают основы.

1 Ответ

0 голосов
/ 28 ноября 2018

Из того, что я могу проследить, код реализован в parallel_apply.py

[Редактировать: Вставьте сюда код для удобства]

def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
    r"""Applies each `module` in :attr:`modules` in parallel on arguments
    contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
    on each of :attr:`devices`.
    Args:
        modules (Module): modules to be parallelized
        inputs (tensor): inputs to the modules
        devices (list of int or torch.device): CUDA devices
    :attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
    :attr:`devices` (if given) should all have same length. Moreover, each
    element of :attr:`inputs` can either be a single object as the only argument
    to a module, or a collection of positional arguments.
    """
    assert len(modules) == len(inputs)
    if kwargs_tup is not None:
        assert len(modules) == len(kwargs_tup)
    else:
        kwargs_tup = ({},) * len(modules)
    if devices is not None:
        assert len(modules) == len(devices)
    else:
        devices = [None] * len(modules)
    devices = list(map(lambda x: _get_device_index(x, True), devices))
    lock = threading.Lock()
    results = {}
    grad_enabled = torch.is_grad_enabled()

    def _worker(i, module, input, kwargs, device=None):
        torch.set_grad_enabled(grad_enabled)
        if device is None:
            device = get_a_var(input).get_device()
        try:
            with torch.cuda.device(device):
                # this also avoids accidental slicing of `input` if it is a Tensor
                if not isinstance(input, (list, tuple)):
                    input = (input,)
                output = module(*input, **kwargs)
            with lock:
                results[i] = output
        except Exception as e:
            with lock:
                results[i] = e

    if len(modules) > 1:
        threads = [threading.Thread(target=_worker,
                                    args=(i, module, input, kwargs, device))
                   for i, (module, input, kwargs, device) in
                   enumerate(zip(modules, inputs, kwargs_tup, devices))]

        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
    else:
        _worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])

    outputs = []
    for i in range(len(inputs)):
        output = results[i]
        if isinstance(output, Exception):
            raise output
        outputs.append(output)
    return outputs
  • modules - это параллельные модули.
  • inputs являются тензорами для модулей
  • devices являются устройствами CUDA
  • results и output сохраняют окончательный результат
  • _worker() - основная функция, выполняемая потоком
...