F # - нужна помощь для преобразования этого, чтобы использовать пул потоков - PullRequest
0 голосов
/ 21 марта 2012

Я новичок в F #, и я откровенно использовал приведенный ниже код из различных примеров, которые я нашел в Интернете, чтобы лучше понять, как я могу его использовать. В настоящее время код ниже читает список машин из файла и проверяет связь с каждой из машин. Мне пришлось разделить исходный массив из файла на более мелкие массивы из 25 машин, чтобы контролировать количество одновременных действий, в противном случае для отображения списка машин требуется много времени. Я хотел бы иметь возможность использовать пул потоков для управления потоками, но я не нашел способа заставить его работать. Любое руководство было бы здорово. Я не могу сделать эту работу:

let creatework  = FileLines|> Seq.map (fun elem -> ThreadPool.QueueUserWorkItem(new WaitCallback(dowork), elem))

Вот полный код:

open System.Threading
open System
open System.IO

let filePath = "c:\qa\machines.txt"

let FileLines = File.ReadAllLines(filePath)

let count = FileLines.Length/25

type ProcessResult = { exitCode : int; stdout : string; stderr : string } 

let executeProcess (exe,cmdline) = 
    let psi = new System.Diagnostics.ProcessStartInfo(exe,cmdline) 
    psi.UseShellExecute <- false
    psi.RedirectStandardOutput <- true 
    psi.RedirectStandardError <- true 
    psi.CreateNoWindow <- true
    let p = System.Diagnostics.Process.Start(psi, EnableRaisingEvents = true) 
    let output = new System.Text.StringBuilder()
    let error = new System.Text.StringBuilder() 
    p.OutputDataReceived.Add(fun args -> output.AppendLine(args.Data)|> ignore) 
    p.ErrorDataReceived.Add(fun args -> error.AppendLine(args.Data) |> ignore) 
    p.BeginErrorReadLine() 
    p.BeginOutputReadLine()
    p.WaitForExit()
    { exitCode = p.ExitCode; stdout = output.ToString(); stderr = error.ToString() } 

let dowork machinename=
    async{
        let exeout = executeProcess(@"c:\windows\system32\ping.exe", "-n 1 " + machinename)
        let exelines = 
            if exeout.stdout.Contains("Reply from") then Console.WriteLine(machinename + " " + "REPLY")
            elif exeout.stdout.Contains("Request timed out.") then Console.WriteLine(machinename + " " + "RTO")
            elif exeout.stdout.Contains("Ping request could not find host") then Console.WriteLine(machinename + " " + "Unknown Host")
            else Console.WriteLine(machinename + " " + "ERROR")
        exelines
        }

printfn "%A" (System.DateTime.Now.ToString())

for i in 0..count do
    let x = i*25
    let y = if i = count then FileLines.Length-1 else (i+1)*25
    printfn "%s %d" "X equals: " x
    printfn "%s %d" "Y equals: " y
    let filesection = FileLines.[x..y]
    let creatework = filesection |> Seq.map dowork |> Async.Parallel |> Async.RunSynchronously|>ignore
    creatework

printfn "%A" (System.DateTime.Now.ToString())
printfn "finished"

UPDATE: Код ниже работает и обеспечивает основу для того, что я хочу сделать. Ссылка, на которую ссылался Томас Петричек, содержала кусочки кода, которые сделали эту работу. Я просто должен был понять, какой пример был правильным. Он находится в пределах 3 секунд от дублированного фреймворка, написанного на Java, поэтому я думаю, что я движусь в правильном направлении. Я надеюсь, что приведенный ниже пример будет полезен для всех, кто пытается создать поток для различных исполняемых файлов в F #:

open System
open System.IO
open System.Diagnostics

let filePath = "c:\qa\machines.txt"

let FileLines = File.ReadAllLines(filePath)

type Process with
    static member AsyncStart psi =
        let proc = new Process(StartInfo = psi, EnableRaisingEvents = true)
        let asyncExit = Async.AwaitEvent proc.Exited
        async {
            proc.Start() |> ignore
            let! args = asyncExit
            return proc
        } 

let shellExecute(program : string, args : string) =
    let startInfo =
        new ProcessStartInfo(FileName = program, Arguments = args,
            UseShellExecute = false,
            CreateNoWindow = true,
            RedirectStandardError = true,
            RedirectStandardOutput = true)
    Process.AsyncStart(startInfo)

let dowork (machinename : string)=
    async{
        let nonbtstat = "NONE"
        use! pingout = shellExecute(@"c:\windows\system32\ping.exe", "-n 1 " + machinename)
        let pingRdToEnd = pingout.StandardOutput.ReadToEnd()
        let pingresults =
            if pingRdToEnd.ToString().Contains("Reply from") then (machinename + " " + "REPLY")
            elif pingRdToEnd.ToString().Contains("Request timed out.") then (machinename + " " + "RTO")
            elif pingRdToEnd.ToString().Contains("Ping request could not find host") then (machinename + " " + "Unknown Host")
            else (machinename + " " + "PING_ERROR")
        if pingresults.ToString().Contains("REPLY") then
            use! nbtstatout = shellExecute(@"c:\windows\system32\nbtstat.exe", "-a " + machinename)
            let nbtstatRdToEnd = nbtstatout.StandardOutput.ReadToEnd().Split('\n')
            let nbtstatline = Array.tryFind(fun elem -> elem.ToString().Contains("<00>  UNIQUE      Registered")) nbtstatRdToEnd
            return Console.WriteLine(pingresults + nbtstatline.Value.ToString())
        else return Console.WriteLine(pingresults + " " + nonbtstat)
        }

printfn "%A" (System.DateTime.Now.ToString())

let creatework = FileLines |> Seq.map dowork |> Async.Parallel |> Async.RunSynchronously|>ignore
creatework

printfn "%A" (System.DateTime.Now.ToString())
printfn "finished" 

1 Ответ

6 голосов
/ 21 марта 2012

Основная проблема с вашим кодом заключается в том, что executeProcess - это синхронная функция, выполнение которой занимает много времени (она запускает процесс ping.exe и ожидает его результата). Общее правило состоит в том, что задачи в пуле потоков не должны блокироваться в течение длительного времени (потому что тогда они блокируют потоки пула потоков, что означает, что пул потоков не может эффективно планировать другую работу).

Я думаю, вы можете решить эту проблему довольно просто, сделав executeProcess асинхронным. Вместо вызова WaitForExit (который блокирует) вы можете дождаться события Exitted, используя Async.AwaitEvent:

let executeProcess (exe,cmdline) = async {
    let psi = new System.Diagnostics.ProcessStartInfo(exe,cmdline)  
    psi.UseShellExecute <- false 
    // [Lots of stuff omitted]
    p.BeginOutputReadLine() 
    let! _ = Async.AwaitEvent p.Exited
    return { exitCode = p.ExitCode
             stdout = output.ToString(); stderr = error.ToString() } }

Это должно разблокировать потоки в пуле потоков, и вы сможете использовать Async.Parallel на всех URL-адресах входного массива без какого-либо ручного планирования.

EDIT Как указал @desco в комментарии, вышеупомянутое не совсем верно, если процесс завершается до достижения строки AwaitEvent (до того, как он может пропустить событие). Чтобы исправить это, вам нужно использовать функцию Event.guard, которая обсуждалась в этом вопросе SO:

...