«Связывание» асинхронных функций в F # - PullRequest
3 голосов
/ 22 июля 2011

Я создал функцию в F # для восстановления исторических данных из Yahoo (классический асинхронный пример для F #):

let getCSV ticker dStart dEnd =
async   {
        let query = getFileUrl ticker dStart dEnd
        let req = WebRequest.Create(query)
        use! resp = req.AsyncGetResponse()
        use stream= resp.GetResponseStream()
        use reader = new StreamReader(stream)
        let content = reader.ReadToEnd()
        let ts = parseData content
        return ts
        }

Теперь я могу запустить эту функцию асинхронно, выполнив следующие действия:

let test=
    ["MSFT";"YHOO"]
    |>List.map (fun x -> getCSV x (DateTime.Parse("01.01.2000")) (DateTime.Parse("01.01.2010")))
    |> Async.Parallel
    |> Async.RunSynchronously

Хорошо, это круто.

Теперь я хотел бы знать, как применить к этому некоторую функцию, которая является историей цен:

Например:

let getReturns (prices:(DateTime *float)list) =
    [for i in 1..(prices.Length-1) -> i]
    |> List.map (fun i ->(fst (List.nth prices i), (snd (List.nth prices i))/(snd (List.nth prices (i-1) )) - 1.0))

Итак, тривиальный способ сделать это:

let test2=
    ["MSFT";"YHOO"]
    |>List.map (fun x -> getCSV x (DateTime.Parse("01.01.2000")) (DateTime.Parse("01.01.2010")))
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Array.map getReturns;;

Однако функция getReturns выполняется после загрузки и анализа каждого файла.

Что я хотел бы знать, так это то, можно ли начать выполнение второй функции, пока идет загрузка: после завершения MSFT не нужно ждать, пока YHOO завершит вычисление ее возврата ...

Я знаю, что могу изменить getCSV, но я хотел бы знать, есть ли способ «связать» функцию getReturn без необходимости изменения ранее написанного модуля ...

Ответы [ 2 ]

10 голосов
/ 22 июля 2011

Обычно я пишу вызов функции непосредственно в асинхронном рабочем процессе. Это в основном вопрос стиля или предпочтений - я думаю, что код, написанный с использованием асинхронных рабочих процессов, как правило, более явный и не использует функции высшего порядка так часто (хотя они все еще иногда полезны):

let test=
    [ for stock in ["MSFT";"YHOO"] ->
        async { let! data = getCSV stock (DateTime(2000, 1, 1)) (DateTime(2010, 1, 1))
                return getReturns data } ]
    |> Async.Parallel
    |> Async.RunSynchronously 

Это означает, что рабочие процессы, выполняемые параллельно, сначала получают данные, а затем вызывают getRteurns для извлечения данных. Затем вся операция распараллеливается.

В качестве альтернативы, вы можете использовать решение Джоэла (изменить функцию getReturns так, чтобы она брала асинхронный рабочий процесс и возвращал асинхронный рабочий процесс) или определить функцию Async.map, которая принимает асинхронный рабочий процесс и создает новый, который применяется некоторая функция к результату.

Используя исходную функцию getReturns, вы можете написать:

let test=
    ["MSFT";"YHOO"]
    // For every stock name, generate an asynchronous workflow
    |> List.map (fun x -> getCSV x (DateTime(2000, 1, 1)) (DateTime(2010, 1, 1)))
    // For every workflow, transform it into a workflow that 
    // applies 'getReturns' to the result of the original workflow
    |> List.map (Async.map getReturns)
    // Run them all in parallel
    |> Async.Parallel
    |> Async.RunSynchronously

Определение Async.map довольно просто:

module Async =
  let map f workflow = async {
    let! res = workflow
    return f res }
3 голосов
/ 22 июля 2011

Если вы определили свою функцию getReturns следующим образом ...

let getReturns (prices:Async<(DateTime * float) list>) = async {
    let! prices = prices
    return [for i in 1..(prices.Length-1) -> i]
           |> List.map (fun i ->(fst (List.nth prices i), (snd (List.nth prices i))/(snd (List.nth prices (i-1)))))
}

Тогда вы сможете сделать это:

let test=
    ["MSFT";"YHOO"]
    |> List.map (fun x -> getCSV x (DateTime(2000, 1, 1)) (DateTime(2010, 1, 1)))
    |> List.map getReturns
    |> Async.Parallel
    |> Async.RunSynchronously

Вы можете очистить его, изменив getCSV так, чтобы ticker был последним параметром вместо первого. Это позволяет частично применять аргументы даты для создания функции, для выполнения которой требуется только тикер. Затем вы можете связать эту функцию с помощью getReturns.

let test =
    let getRange = getCSV (DateTime(2000, 1, 1)) (DateTime(2010, 1, 1))
    ["MSFT"; "YHOO"]
    |> List.map (getRange >> getReturns)
    |> Async.Parallel
    |> Async.RunSynchronously

Редактировать

Все эти List.nth вызовы в вашей функции getReturns вызывают у меня зуд. Я бы предпочел использовать сопоставление с образцом сам. Я думаю, что вы могли бы написать эту функцию следующим образом:

let getReturns2 (prices: Async<(DateTime * float) list>) = async {
    let! prices = prices
    let rec loop items output =
        match items with
        | (_, last) :: (time, current) :: rest ->
            loop rest ((time, (last / current)) :: output)
        | [ item ] ->
            List.rev (item :: output)
        | [] ->
            List.rev output
    return loop prices []
}
...