Динамически обрабатывать более длительные параллельные задания в F # - PullRequest
0 голосов
/ 27 марта 2020

Я борюсь с правильным подходом для обработки более длинных запросов / заданий в F #.

Требование:

  • Задание состоит из нескольких этапов (которые должны выполняться последовательно).
  • Работа может занять несколько минут, скажем, до 10 минут.
  • Шаг может включать операции ввода-вывода и время ожидания, например, до файлов, созданных шаг обрабатывается другими приложениями, а затем возвращается.
  • Возможно, что шаг завершается неудачно или достигается состояние, при котором задание должно быть завершено досрочно.
  • Должна быть возможность обработки нескольких заданий параллельно.
  • Задания запускаются / добавляются по запросу пользователя.
  • Я хочу иметь возможность отслеживать состояние заданий (текущий шаг, результат предыдущих шагов) по запросу.

Текущее решение:

В настоящее время я использую FileSystemWatcher для отслеживания «входящих» с запросами на работу. В результате запроса добавляется задание в список, которым управляет агент (MailboxProcessor). Как только задание добавляется в список, запускается новый поток (пусть t = new Thread (…) -> t.Start ()) и ссылка на поток сохраняется с параметрами задания (и в списке ). В потоке все шаги выполняются последовательно. Таким образом, я могу отслеживать статус задания (проверить, жив ли поток до сих пор) и обрабатывать задания одновременно.

Однако, похоже, это не позволяет мне получать информацию о шагах в задание / поток.

Желаемое решение:

Кроме того, я хочу переключиться с FileSystemWatcher на REST API на основе Suave. Кажется, что проблема, с которой я сталкиваюсь (параллельное выполнение задания и сбор информации о шагах, передача статуса по запросу), одинакова в обоих мирах (запросы, инициируемые событиями FileSystemWatcher или REST API), но я использую подход REST, чтобы объяснить свой требуемая функциональность:

Я хочу иметь возможность запускать задания (POST) (с ответом: задание принято, идентификатор задания = xyz), проверить состояние заданий (GET с идентификатором задания, ответ, содержащий результаты шага и текущий шаг), и если обработка завершена, получите результат задания (GET с идентификатором задания).

По крайней мере, эта настройка кажется удобной и удовлетворяет текущие потребности.

Может кто-нибудь помогите мне, указав мне правильные инструменты / подход для удовлетворения такого требования? Я совершенно не в том направлении?

Я надеюсь, что объяснение может быть понято и другими, кроме меня.

Спасибо и наилучшими пожеланиями cil

1 Ответ

0 голосов
/ 27 марта 2020

Если Я выполнял ряд требований, подобных этому, тогда я хотел бы взглянуть на следующие инструменты:

Используя базовую рабочую службу. NET, есть C# template dotnet new worker -lang c# -o CSharpService создает C# долго работающую программу. Такой же долго работающий сервис может быть создан в F #.

Для создания проекта необходимо следующее:

dotnet new console -lang F# -o FsharpService
cd FsharpService
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Microsoft.Extensions.Hosting.WindowsServices
dotnet add package System.Net.NameResolution

И затем замените Program.fs на:

open System
open System.Threading.Tasks
open Microsoft.Extensions.DependencyInjection
open Microsoft.Extensions.Hosting
open Microsoft.Extensions.Logging

type Worker(logger : ILogger<Worker>) =
    inherit BackgroundService()
    let _logger = logger
    override bs.ExecuteAsync stoppingToken =
        let f : Async<unit> = async {
            while not stoppingToken.IsCancellationRequested do
                _logger.LogInformation("Worker running at: {time}", DateTime.Now)
                do! Async.Sleep(1000)
        }
        Async.StartAsTask f :> Task

let CreateHostBuilder argv : IHostBuilder =
    let builder = Host.CreateDefaultBuilder(argv)
    builder.UseWindowsService()
        .ConfigureServices(fun hostContext services -> services.AddHostedService<Worker>() 
                                                        |> ignore<IServiceCollection>)
[<EntryPoint>]
let main argv =
    let hostBuilder = CreateHostBuilder argv
    hostBuilder.Build().Run()
    0 // return an integer exit code

И, наконец, если вы находитесь на windows до Создайте реестр и запустите службу:

dotnet publish -r win-x64 -c Release /p:PublishSingleFile=true /p:Trimmed=true -o "./published"
sc create FsharpService binPath= "%cd%\published\FsharpService.exe"
services.msc

Подробнее об этом можно узнать в моем блоге .

Другая технология, на которую я бы посмотрел, была бы Задача параллельной библиотеки . Это позволяет вам создать рабочий процесс, некоторые или все из которых могут быть параллельными, но с параллелизмом и передачей сообщений между блоками. Это просто для вызова из F # и модели, где каждый блок имеет тип ввода и (в некоторых случаях) тип вывода поддается F # проектированию с типами подход.

Вот Простой пример я собрал, когда впервые посмотрел на TPL и F #. NB: у меня не было возможности запустить это и подтвердить, что он все еще работает, также вам нужно будет изменить команду #r для работы на вашем компьютере, если вы попытаетесь использовать ее.

#r @"System.Threading.Tasks.Dataflow.dll"

open System
open System.IO
open System.Threading.Tasks.Dataflow

let buildPropagateLinkOption () =
    let mutable linkOption = new DataflowLinkOptions()
    linkOption.PropagateCompletion <- true
    linkOption

let buildParallelExecutionOption noThreads =
    let mutable executionOption = new ExecutionDataflowBlockOptions()
    executionOption.MaxDegreeOfParallelism <- noThreads
    executionOption

type TPLRequest = {
    path:string ;
    filter:string ;
}

type TPLFile = {
    fileName : string ;
}

type TPLResponse = {
    fileName : string ;
    size : int64 ;
}

let b1Impl (inReq:TPLRequest) : TPLFile seq = 
    printfn "Directory %s %A" inReq.path System.Threading.Thread.CurrentThread.ManagedThreadId
    Directory.EnumerateFiles(inReq.path, inReq.filter) |> Seq.map(fun x -> {fileName = x})

let b2Impl (inReq:TPLFile) : TPLResponse =
    let fInfo = FileInfo(inReq.fileName)
    printfn "File %s %A" inReq.fileName System.Threading.Thread.CurrentThread.ManagedThreadId
    {fileName = inReq.fileName; size = fInfo.Length }

let b3Impl (inReq:TPLResponse) =
    printfn "%s %d %A" inReq.fileName inReq.size System.Threading.Thread.CurrentThread.ManagedThreadId

let buildFlow () =
    let parallelExecutionOption = buildParallelExecutionOption 4
    let b1 = new TransformManyBlock<TPLRequest,TPLFile>((fun x -> b1Impl x),parallelExecutionOption)
    let b2 = new TransformBlock<TPLFile,TPLResponse>((fun x -> b2Impl x),parallelExecutionOption)
    let b3 = new ActionBlock<TPLResponse>((fun x ->b3Impl x),parallelExecutionOption)
    let propagateLinkOption = buildPropagateLinkOption ()
    b1.LinkTo(b2,propagateLinkOption) |> ignore<IDisposable>
    b2.LinkTo(b3,propagateLinkOption) |> ignore<IDisposable>
    b1

let runFlow () =
    let flow = buildFlow ()
    flow.Post {path="C:\\temp"; filter = "*.txt"} |> ignore<bool>
    flow.Post {path="C:\\temp"; filter = "*.zip"} |> ignore<bool>
    flow.Complete()
    flow.Completion.Wait()
    ()

runFlow ()
...