проблема
Хотя код, о котором я расскажу здесь, я написал на F #, он основан на .NET 4 framework, не зависящем конкретно от какой-либо особенности F # (по крайней мере, так кажется!).
У меня есть некоторые данные на диске, которые я должен обновить из сети, сохранив последнюю версию на диск:
type MyData =
{ field1 : int;
field2 : float }
type MyDataGroup =
{ Data : MyData[];
Id : int }
// load : int -> MyDataGroup
let load dataId =
let data = ... // reads from disk
{ Data = data;
Id = dataId }
// update : MyDataGroup -> MyDataGroup
let update dg =
let newData = ... // reads from the network and process
// newData : MyData[]
{ dg with Data = dg.Data
|> Seq.ofArray
|> Seq.append newData
|> processDataSomehow
|> Seq.toArray }
// save : MyDataGroup -> unit
let save dg = ... // writes to the disk
let loadAndSaveAndUpdate = load >> update >> save
Проблема в том, что для loadAndSaveAndUpdate
всех моих данных мне нужно будет выполнить функцию много раз:
{1 .. 5000} |> loadAndSaveAndUpdate
Каждый шаг будет делать
- некоторый дисковый ввод-вывод,
- некоторые данные хрустят,
- некоторый сетевой ввод-вывод (с большой задержкой),
- больше хруста данных,
- и некоторый дисковый ввод-вывод.
Не было бы неплохо, если бы это было сделано параллельно, до некоторой степени? К сожалению, ни одна из моих функций чтения и синтаксического анализа не "готова к асинхронным рабочим процессам".
Первые (не очень хорошие) решения, которые я придумал
Задачи
Первым делом я настроил Task[]
и запустил их все:
let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
|> Seq.map createTask
|> Seq.toArray
tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)
Затем я нажимаю CTRL + ESC, чтобы посмотреть, сколько потоков он использовал. 15, 17, ..., 35, ..., 170, ... пока не убили приложение! Что-то пошло не так.
Параллельное
Я сделал почти то же самое, но использовал Parallel.ForEach(...)
, и результаты были одинаковыми: много-много-много-много тем.
Решение, которое работает ... вроде
Тогда я решил запустить только n
темы, Task.WaitAll(of them)
, затем другие n
, пока не осталось доступных задач.
Это работает, но проблема в том, что когда он завершит обработку, скажем, n-1
задач, он будет ждать, ждать, ждать последнюю чертову задачу, которая настаивает на блокировке из-за большой задержки в сети. Это не хорошо!
Итак, как бы вы атаковали эту проблему ? Буду признателен за просмотр различных решений, включая асинхронные рабочие процессы (и в данном случае, как адаптировать мои неасинхронные функции), параллельные расширения, странные параллельные шаблоны и т. Д.
Спасибо.