Потоковая передача больших объемов данных от разных клиентов одновременно - PullRequest
0 голосов
/ 26 января 2020

Это небольшая проблема архитектуры и кода. У меня много исходных URL-адресов, содержащих огромные файлы, которые приходят от разных клиентов, которые я должен загрузить и сохранить в файловой системе.

У меня есть аппаратные ограничения на ОЗУ. Поэтому я хочу буферизовать каждый поток кусками байтов, и я думаю, что будет хорошей идеей инициировать один поток для каждой загрузки потока.

Я добавил код для запуска потока / задачи с помощью Task Parallel Библиотека как таковая:

public Task RunTask(Action action)
{
    Task task = Task.Run(action);

    return task;
}

и я передаю для параметра действия следующий метод:

public void DownloadFileThroughWebStream(WebClient webClient, Uri src, string dest, long buffersize)
{
    Stream stream = webClient.OpenRead(src);

    byte[] buffer = new byte[buffersize];
    int len;
    using (BufferedStream bufferedStream = new BufferedStream(stream))
    {
        using (FileStream fileStream = new FileStream(Path.GetFullPath(dest), FileMode.Create, FileAccess.Write))
        {
            while ((len = stream.Read(buffer, 0, buffer.Length)) > 0)
            {
                fileStream.Write(buffer, 0, len);
                fileStream.Flush();
            }
        }

    }
}

И я в целях тестирования пытаюсь загрузить некоторые ресурсы из http uri, как, инициируя поток / задача для каждого специфика c скачать:

[Test]
public async Task DownloadSomeStream()
{
    Uri uri = new Uri("http://mirrors.standaloneinstaller.com/video-sample/metaxas-keller-Bell.mpeg");

    List<Uri> streams = new List<Uri> { uri, uri, uri};

    List<Task> tasks = new List<Task>();

    var path = "C:\\TMP\\";
    //Create task for each of the streams from uri
    int c = 1;
    foreach (var uri in streams)
    {
        WebClient webClient = new WebClient();
        Task task = taskInitiator.RunTask(() => DownloadFileThroughWebStream(webClient, uri, Path.Combine(path,"File"+c), 8192));
        tasks.Add(task);
        c++;
    }
    Task allTasksHaveCompleted = Task.WhenAll(tasks);
    await allTasksHaveCompleted;
}

Я получаю следующее исключение:

System.IO.IOException: 'The process cannot access the file 'D:\TMP\File4' because it is being used by another process'

в строке:

using (FileStream fileStream = new FileStream(Path.GetFullPath(dest), FileMode.Create, FileAccess.Write))

Итак, есть две вещи, которые я не понимаю с этим исключением:

  1. Почему нельзя писать? и как другой процесс распределяет файл?

  2. Почему он хочет сохранить file4, когда я только добавил 3 URL, поэтому у меня должны быть только файлы: file1, file2, and file3?

Кроме того, другие вопросы, которые могут быть полезны для размышления:

  1. Правильно ли я подхожу к тому, что я делаю в отношении того, что я хочешь добиться? Правильно ли я выполняю инициации задач с помощью библиотеки параллельных задач?

  2. Любые советы и рекомендации, лучшие практики и т. Д. c.?

Ответы [ 2 ]

1 голос
/ 27 января 2020

Почему нельзя писать? и как другой процесс выделяет файл?

Файл заблокирован не другим процессом, а тем же процессом. Если вы открываете файл для записи, вы в основном получаете эксклюзивную блокировку для него. Когда вы снова пытаетесь открыть файл для записи из другой задачи, он блокируется, и поэтому вы получаете сообщение об ошибке.

Чтобы справиться с этим случаем, вы должны поставить lock вокруг записи данных на диск , У вас должен быть отдельный объект блокировки для каждого уникального имени файла, в который вы пишете, и будьте осторожны, чтобы использовать правильную блокировку!

Почему он хочет сохранить файл4, когда я Я добавил только 3 URL, поэтому у меня должны быть только файлы: file1, file2 и file3?

Это потому, что вы перехватываете переменную c в делегате, который вы передаете Task.Run. Поскольку эти задачи обычно начинаются после того, как l oop закончится, значение c теперь равно 4. См. здесь для получения дополнительной информации о замыканиях.

0 голосов
/ 27 января 2020

Мы можем создать метод загрузки, который может выполнять загрузку:

async Task DownloadFile(string url, string location, string fileName)
{
    using (var client = new WebClient())
    {
        await client.DownloadFileTaskAsync(url, $"{location}{fileName}");
    }
}

И вышеупомянутый метод может быть вызван Task.Run() для одновременной загрузки файлов:

IList<string> urls = new List<string>()
{
    @"http://mirrors.standaloneinstaller.com/video-sample/metaxas-keller-Bell.mpeg",
    @"https://...",
    @"https://..."
};

string location = "D:";
Directory.CreateDirectory(location);

Task.Run(async () =>
{
    var tasks = urls.Select(url => 
    {
        var fileName = url.Substring(url.LastIndexOf('/'));
        return DownloadFile(url, location, fileName);
    }).ToArray();
    await Task.WhenAll(tasks);
}).GetAwaiter().GetResult();
...