MailboxProcessor - Скажите, когда остановиться? - PullRequest
2 голосов
/ 09 августа 2011

Я сейчас играю с MailboxProcessor.Поэтому я составил несколько агентов, которые могут сканировать каталог на компьютере и все его подкаталоги, а затем распечатывать файлы в каждом каталоге:

let fileCollector =
  MailboxProcessor.Start(fun self -> 
    let rec loop() =
      async { let! file = self.Receive()
              printfn "%s" file
              return! loop() }
    loop()) 

let folderCollector = 
  MailboxProcessor.Start(fun self -> 
    let rec loop() =
      async { let! dir = self.Receive()
              do! Async.StartChild(
                    async { let! files = Directory.AsyncGetFiles dir
                            for z in files do fileCollector.Post z }) |> Async.Ignore
              return! loop() }
    loop())

let crawler =
  MailboxProcessor.Start(fun self ->
    let rec loop() =
      async { let! dir = self.Receive()
              folderCollector.Post dir
              do! Async.StartChild(
                    async { let! dirs = Directory.AsyncGetDirectories dir
                            for z in dirs do self.Post z }) |> Async.Ignore
              return! loop() }
    loop())

crawler.Post @"C:\Projects"

printfn "Done" // Message getting fired right away, due to the async stuff.

Теперь, как бы я сказал, когда folderCollector, fileCollector и crawler выполнены, так что оператор printfn в конце будет вызван ПОСЛЕ того, как сканер успешно просканирует все подкаталоги и напечатает все файлы?

Обновление: Используя технику, показанную Томасом Петричеком в http://tomasp.net/blog/parallel-extra-image-pipeline.aspx,, мне удалось создать следующий код:

let folders = new BlockingQueueAgent<string>(100)
let files = new BlockingQueueAgent<string>(100)

let rec folderCollector path =
  async { do! folders.AsyncAdd(path)
          do! Async.StartChild(
                  async { let! dirs = Directory.AsyncGetDirectories path
                          for z in dirs do
                            do! folderCollector z }) |> Async.Ignore }

let fileCollector =
  async { while true do
            let! dir = folders.AsyncGet()
            do! Async.StartChild(
                    async { let! fs = Directory.AsyncGetFiles dir
                            for z in fs do
                              do! files.AsyncAdd z }) |> Async.Ignore }

let rec printFiles() =
  async { let! file = files.AsyncTryGet(75)
          match file with
          | Some s -> 
            printfn "%s" s
            return! displayFiles()
          | None -> () }

let cts = new CancellationTokenSource()
Async.Start(folderCollector @"C:\Projects", cts.Token)
Async.Start(fileCollector, cts.Token)
Async.RunSynchronously(printFiles(), cancellationToken = cts.Token)

printfn "DONE!"

Обновление: Обновление: Хорошо, такЯ перепутал следующий код:

let folders = new BlockingQueueAgent<string option>(10)
let files = new BlockingQueueAgent<string option>(10)

let folderCollector path =
  async { let rec loop path = 
            async { do! folders.AsyncAdd(Some path)
                    let! dirs = Directory.AsyncGetDirectories path
                    do! [ for z in dirs -> loop z ] |> Async.Parallel |> Async.Ignore } 
          do! loop path 
          do! folders.AsyncAdd(None) }

let rec fileCollector() =
  async { let! dir = folders.AsyncGet 125
          match dir with
          | Some s -> 
            let fs = Directory.GetFiles s
            do! [ for z in fs -> printfn "%s" z; files.AsyncAdd(Some z) ] |> Async.Parallel |> Async.Ignore // <-- Fails silence if files are full
            do! fileCollector() // <-- unreachable
          | None -> printfn "Done!"; ()}

Это выглядит хорошо, а?По какой-то причине в строке do! fileCollector() в функции fileCollector() не будет выполняться, если агент files BlockingQueueAgent заполнен.Вместо этого он теряет молчание.

Однако, если я делаю:

let folderCollector path =
  async { let rec loop path = 
            async { do! folders.AsyncAdd(Some path)
                    let! dirs = Directory.AsyncGetDirectories path
                    do! [ for z in dirs -> loop z ] |> Async.Parallel |> Async.Ignore } 
          do! loop path 
          do! folders.AsyncAdd(None) }

let rec fileCollector() =
  async { let! dir = folders.AsyncGet 75
          match dir with
          | Some s -> 
            let fs = Directory.GetFiles s
            do! Async.StartChild(async { do! [ for z in fs -> printfn "%s" z; files.AsyncAdd(Some z) ] 
                                             |> Async.Parallel |> Async.Ignore } ) |> Async.Ignore
            do! fileCollector()
          | None -> printfn "Done!"; ()}

Это работает просто отлично.Однако теперь я не могу отследить, когда fileCollector выполнен, так как он выполняет кучу асинхронных вычислений, и, следовательно, даже когда он достигает значения «Нет» в очереди, ему все равно придется поработать.Что происходит?


Обновление: Я изменил fileCollector на тот же "стиль", что и folderCollector, но проблема остается.Модифицированная версия:

let fileCollector() =
  async { let rec loop() = 
            async { let! dir = folders.AsyncGet 750
                    match dir with
                    | Some s -> 
                      let! fs = Directory.AsyncGetFiles s
                      do! [ for z in fs -> printfn "%A" z; files.AsyncAdd(Some z) ] 
                            |> Async.Parallel |> Async.Ignore 
                      return! loop()
                    | None -> printfn "Done!"; () }
          do! loop()
          printfn "after" // Never gets this far... 
          do! files.AsyncAdd(None) }

Ответы [ 2 ]

3 голосов
/ 10 августа 2011

Чтобы ответить на ваш второй вопрос (из комментария) об обновленной версии, основанной на конвейерах - я думаю, вы могли бы использовать BlockingQueueAgent<option<string>> и использовать значение None, когда вы закончите генерировать все файлы (тогда значение None распространяться по конвейеру, и вы можете завершить все рабочие процессы, когда они получат None).

Чтобы сделать это, вам нужно изменить folderCollector, чтобы фактически определять, когда он завершает итерацию. Это не проверено, но должно работать следующее (дело в том, что вам нужно дождаться завершения рекурсивного вызова):

let rec folderCollector path =
  let rec loop path = 
    async { do! folders.AsyncAdd(Some path)
            let! dirs = Directory.AsyncGetDirectories path
            do! [ for z in dirs do -> folderCollector z ] 
                |> Async.Parallel |> Async.Ignore }
  async { do! loop path
          do! folders.AsyncAdd(None) }

Все рабочие процессы потенциально могут получить None в результате AsyncGet. Когда это происходит, они должны отправить None следующему работнику в конвейере. Последний может завершиться, когда он получает None:

let rec printFiles() =
  async { let! file = files.AsyncGet(75) // Note - now we use just AsyncGet
          match file with
          | Some s -> 
            printfn "%s" s
            return! displayFiles()
          | None -> () } // Completed processing all files
2 голосов
/ 09 августа 2011

Нет встроенной поддержки для уведомления вас о завершении работы агента F #.Это на самом деле довольно сложно сказать.Агент, даже с пустой очередью, не завершен, потому что он все еще может получать сообщения от других агентов и снова начинать работать.

В вашем примере работа выполняется, когда очереди всех трех агентов пусты,Это можно проверить с помощью CurrentQueueLength.Это не очень хорошее решение, но оно будет работать:

crawler.Post @"C:\Temp"
// Busy waiting until all queues are empty
while crawler.CurrentQueueLength <> 0 || folderCollector.CurrentQueueLength <> 0 ||
      fileCollector.CurrentQueueLength <> 0 do
    System.Threading.Thread.Sleep(10)
printfn "Done"

Я думаю, что лучшим подходом было бы структурирование кода по-другому - вам не нужно использовать агент для рекурсивной обработки каталогадерево.В вашей версии обход каталогов (crawler агент) выполняется параллельно с поиском файлов в папках (folderCollector) и обработкой результатов (fileCollector), поэтому вы по сути реализуете трехшаговый конвейер.

Вы можете проще реализовать конвейеры, используя async с очередями блокировки, используемыми для хранения немедленных результатов обработки.В этой статье показан пример обработки изображений .Я думаю, что такой же подход будет работать для вас тоже.Обнаружение окончания обработки конвейера должно быть проще (после отправки всех входных данных вы можете отправить специальное сообщение, указывающее на завершение, и когда сообщение достигнет конца конвейера, все готово).

Другая альтернатива будетиспользовать асинхронные последовательности , что может быть хорошим шаблоном для такого рода проблем (но в данный момент нет хороших примеров в сети).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...