Нужна помощь в отношении Async и FSI - PullRequest
4 голосов
/ 16 апреля 2010

Я хотел бы написать код, который запускает последовательность сценариев F # (.fsx). Дело в том, что у меня может быть буквально сотни сценариев, и если я сделаю это:

let shellExecute program args =
    let startInfo = new ProcessStartInfo()
    do startInfo.FileName        <- program
    do startInfo.Arguments       <- args
    do startInfo.UseShellExecute <- true
    do startInfo.WindowStyle     <- ProcessWindowStyle.Hidden

    //do printfn "%s" startInfo.Arguments 
    let proc = Process.Start(startInfo)
    ()

scripts
|> Seq.iter (shellExecute "fsi")

это может слишком сильно напрягать мою систему на 2 ГБ. В любом случае, я бы хотел запускать скрипты по пакетам из n, что также кажется хорошим упражнением для изучения Async (я думаю, это путь).

Я начал писать код для этого, но, к сожалению, он не работает:

open System.Diagnostics

let p = shellExecute "fsi" @"C:\Users\Stringer\foo.fsx"

async {
    let! exit = Async.AwaitEvent p.Exited
    do printfn "process has exited"
}
|> Async.StartImmediate

foo.fsx - это всего лишь скрипт hello world. Какой самый идиоматичный способ решения этой проблемы?

Я также хотел бы выяснить, выполнимо ли получить код возврата для каждого исполняемого скрипта, и если нет, найти другой способ. Спасибо!

EDIT:

Большое спасибо за ваши идеи и ссылки! Я многому научился. Я просто хочу добавить код для параллельного запуска пакетов, используя Async.Parallel, как предложил Томас. Пожалуйста, прокомментируйте, если есть лучшая реализация для моей cut функции.

module Seq =
  /// Returns a sequence of sequences of N elements from the source sequence.
  /// If the length of the source sequence is not a multiple
  /// of N, last element of the returned sequence will have a length
  /// included between 1 and N-1.
  let cut (count : int) (source : seq<´T>) = 
    let rec aux s length = seq {
      if (length < count) then yield s
      else
        yield Seq.take count s
        if (length <> count) then
          yield! aux (Seq.skip count s) (length - count)
      }
    aux source (Seq.length source)

let batchCount = 2
let filesPerBatch =
  let q = (scripts.Length / batchCount)
  q + if scripts.Length % batchCount = 0 then 0 else 1

let batchs =
  scripts
  |> Seq.cut filesPerBatch
  |> Seq.map Seq.toList
  |> Seq.map loop

Async.RunSynchronously (Async.Parallel batchs) |> ignore

EDIT2:

Так что у меня были некоторые проблемы с тем, чтобы заставить работать защитный код Томаса. Я полагаю, что функцию f нужно было вызывать в методе AddHandler, в противном случае мы теряем событие навсегда ... Вот код:

module Event =
  let guard f (e:IEvent<´Del, ´Args>) = 
    let e = Event.map id e
    { new IEvent<´Args> with 
        member this.AddHandler(d) = e.AddHandler(d); f() //must call f here!
        member this.RemoveHandler(d) = e.RemoveHandler(d); f()
        member this.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

Интересно (как упомянул Томас), что похоже, что событие Exited сохраняется где-то, когда процесс завершается, даже если процесс не запущен с EnableRaisingEvents, установленным в true. Когда для этого свойства наконец установлено значение true, событие запускается.

Поскольку я не уверен, что это официальная спецификация (а также немного параноик), я нашел другое решение, заключающееся в запуске процесса в функции guard, поэтому мы гарантируем, что код будет работать в зависимости от того, что ситуация:

let createStartInfo program args =
  new ProcessStartInfo
    (FileName = program, Arguments = args, UseShellExecute = false,
     WindowStyle = ProcessWindowStyle.Normal, 
     RedirectStandardOutput = true)

let createProcess info =
  let p = new Process()
  do p.StartInfo           <- info
  do p.EnableRaisingEvents <- true
  p

let rec loop scripts = async { 
  match scripts with 
  | [] -> printfn "FINISHED"
  | script::scripts ->
    let args = sprintf "\"%s\"" script
    let p = createStartInfo "notepad" args |> createProcess
    let! exit =
      p.Exited 
      |> Event.guard (fun () -> p.Start() |> ignore)
      |> Async.AwaitEvent
    let output = p.StandardOutput.ReadToEnd()
    do printfn "\nPROCESSED: %s, CODE: %d, OUTPUT: %A"script p.ExitCode output
    return! loop scripts 
  }

Обратите внимание, что я заменил fsi.exe на notepad.exe , так что я могу пошагово воспроизводить различные сценарии в отладчике и самостоятельно управлять выходом из процесса.

Ответы [ 5 ]

6 голосов
/ 17 апреля 2010

Я провел несколько экспериментов, и вот один из способов решения проблемы, обсуждаемой в комментариях под моим постом и в ответе Джоэла (который, я думаю, в настоящее время не работает, но может быть исправлен).

I думаю, спецификация Process заключается в том, что он может инициировать событие Exited после того, как мы установили для свойства EnableRaisingEvents значение true (и запустит событие, даже если процесс имеет уже завершено, прежде чем мы установим свойство). Чтобы правильно обработать этот случай, нам нужно включить поднятие событий после того, как мы добавим обработчик к событию Exited.

Это проблема, потому что, если мы используем AwaitEvent, она заблокирует рабочий процесс, пока не сработает событие. Мы ничего не можем сделать после вызова AwaitEvent из рабочего процесса (и если мы установим свойство перед вызовом AwaitEvent, тогда мы получим гонку ....). Подход Владимира верен, но я думаю, что есть более простой способ справиться с этим.

Я создам функцию Event.guard, принимающую событие и возвращающую событие, которая позволяет нам указать некоторую функцию, которая будет выполняться после обработчика, прикрепленного к событию. Это означает, что если мы сделаем некоторую операцию (которая, в свою очередь, инициирует событие) внутри этой функции, событие будет обработано.

Чтобы использовать его для обсуждаемой здесь проблемы, нам нужно изменить мое первоначальное решение следующим образом. Во-первых, функция shellExecute не должна устанавливать свойство EnableRaisingEvents (в противном случае мы можем потерять событие!). Во-вторых, код ожидания должен выглядеть следующим образом:

let rec loop scripts = async { 
  match scripts with 
  | [] -> printf "FINISHED"
  | script::scripts ->
    let p = shellExecute fsi script 
    let! exit = 
      p.Exited 
        |> Event.guard (fun () -> p.EnableRaisingEvents <- true)
        |> Async.AwaitEvent
    let output = p.StandardOutput.ReadToEnd()
    return! loop scripts  } 

Обратите внимание на использование функции Event.guard. Грубо говоря, это говорит о том, что после того, как рабочий процесс присоединяет обработчик к событию p.Exited, запускается предоставленная лямбда-функция (и позволяет вызывать события). Тем не менее, мы уже прикрепили обработчик к событию, поэтому, если это вызывает событие немедленно, у нас все хорошо!

Реализация (для Event и Observable) выглядит следующим образом:

module Event =
  let guard f (e:IEvent<'Del, 'Args>) = 
    let e = Event.map id e
    { new IEvent<'Args> with 
        member x.AddHandler(d) = e.AddHandler(d)
        member x.RemoveHandler(d) = e.RemoveHandler(d); f()
        member x.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

module Observable =
  let guard f (e:IObservable<'Args>) = 
    { new IObservable<'Args> with 
        member x.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

Хорошо, что этот код очень прост.

5 голосов
/ 16 апреля 2010

Ваш подход мне подходит, мне очень нравится идея встроить выполнение процесса в асинхронные рабочие процессы, используя AwaitEvent!

Вероятная причина того, что это не сработало, заключается в том, что вам нужно установить EnableRisingEvents свойство Process на true, если вы хотите, чтобы оно когда-либо вызывало событие Exited (не спрашивайте, почему Вы должны сделать это, это звучит довольно глупо для меня!) Во всяком случае, я сделал несколько других изменений в вашем коде при тестировании, так что вот версия, которая работала для меня:

open System
open System.Diagnostics

let shellExecute program args = 
  // Configure process to redirect output (so that we can read it)
  let startInfo = 
    new ProcessStartInfo
      (FileName = program, Arguments = args, UseShellExecute = false,
       WindowStyle = ProcessWindowStyle.Hidden, 
       RedirectStandardOutput = true)

  // Start the process
  // Note: We must enable rising events explicitly here!
  Process.Start(startInfo, EnableRaisingEvents = true)

Самое главное, код теперь устанавливает EnableRaisingEvents в true. Я также изменил код, чтобы использовать синтаксис, в котором вы определяете свойства объекта при его создании (чтобы сделать код немного более лаконичным), и я изменил несколько свойств, чтобы я мог прочитать вывод (RedirectStandardOutput).

Теперь мы можем использовать метод AwaitEvent, чтобы дождаться завершения процесса. Я предполагаю, что fsi содержит путь к fsi.exe, а scripts - это список сценариев FSX. Если вы хотите запустить их последовательно, вы можете использовать цикл, реализованный с использованием рекурсии:

let rec loop scripts = async { 
  match scripts with 
  | [] -> printf "FINISHED"
  | script::scripts ->
    // Start the proces in background
    let p = shellExecute fsi script 
    // Wait until the process completes
    let! exit = Async.AwaitEvent p.Exited 
    // Read the output produced by the process, the exit code
    // is available in the `ExitCode` property of `Process`
    let output = p.StandardOutput.ReadToEnd()
    printfn "\nPROCESSED: %s, CODE: %d\n%A" script p.ExitCode output
    // Process the rest of the scripts
    return! loop scripts  } 

// This starts the workflow on background thread, so that we can
// do other things in the meantime. You need to add `ReadLine`, so that
// the console application doesn't quit immedeiately
loop scripts |> Async.Start
Console.ReadLine() |> ignore    

Конечно, вы также можете запускать процессы параллельно (или, например, запускать 2 группы из них параллельно и т. Д.). Для этого вы должны использовать Async.Parallel (обычным способом).

В любом случае, это действительно хороший пример использования асинхронных рабочих процессов в области, где я до сих пор их не использовал. Очень интересно: -)

3 голосов
/ 17 апреля 2010

В ответ на ответ Томаса, будет ли это приемлемым решением для состояния гонки, связанного с запуском процесса и последующей подпиской на событие Exited?

type Process with
    static member AsyncStart psi =
        let proc = new Process(StartInfo = psi, EnableRaisingEvents = true)
        let asyncExit = Async.AwaitEvent proc.Exited
        async {
            proc.Start() |> ignore
            let! args = asyncExit
            return proc
        }

Если я не ошибаюсь, это подпишется на событие до начала процесса и упакует все это как Async<Process> результат.

Это позволит вам переписать остальную часть кода следующим образом:

let shellExecute program args = 
  // Configure process to redirect output (so that we can read it)
  let startInfo = 
    new ProcessStartInfo(FileName = program, Arguments = args, 
        UseShellExecute = false,
        WindowStyle = ProcessWindowStyle.Hidden, 
        RedirectStandardOutput = true)

  // Start the process
  Process.AsyncStart(startInfo)

let fsi = "PATH TO FSI.EXE"

let rec loop scripts = async { 
    match scripts with 
    | [] -> printf "FINISHED"
    | script::scripts ->
        // Start the proces in background
        use! p = shellExecute fsi script 
        // Read the output produced by the process, the exit code
        // is available in the `ExitCode` property of `Process`
        let output = p.StandardOutput.ReadToEnd()
        printfn "\nPROCESSED: %s, CODE: %d\n%A" script p.ExitCode output
        // Process the rest of the scripts
        return! loop scripts 
} 

Если это делает работу, то, конечно, гораздо меньше кода, о котором нужно беспокоиться, чем у Владимира Async.GetSubject.

1 голос
/ 24 июня 2010

Можно упростить версию темы из блога. вместо возврата имитации события getSubject может вернуть рабочий процесс.

Рабочий процесс результата сам по себе является конечным автоматом с двумя состояниями 1. Событие еще не было инициировано: все ожидающие слушатели должны быть зарегистрированы 2. Значение уже установлено, слушатель обслуживается немедленно В коде это будет выглядеть так:

type SubjectState<'T> = Listen of ('T -> unit) list | Value of 'T

Реализация getSubject тривиальна

let getSubject (e : IEvent<_, _>) = 
    let state = ref (Listen [])
    let switchState v = 
        let listeners = 
            lock state (fun () ->
                match !state with
                | Listen ls -> 
                    state := Value v 
                    ls
                | _ -> failwith "Value is set twice"
            )
        for l in listeners do l v

    Async.StartWithContinuations(
        Async.AwaitEvent e,
        switchState,
        ignore,
        ignore
    )

Async.FromContinuations(fun (cont, _, _) ->
    let ok, v = lock state (fun () ->
        match !state with
        | Listen ls ->
            state := Listen (cont::ls)
            false, Unchecked.defaultof<_>
        | Value v ->
            true, v
        )
    if ok then cont v
    )
1 голос
/ 16 апреля 2010

А как насчет процессора почтового ящика?

...