Throttle - вызывать функцию не чаще, чем каждые N миллисекунд? - PullRequest
0 голосов
/ 02 июня 2018

Функция callWebServiceAsync в следующей функции будет отклонена, если она вызывается слишком часто.Для завершения может потребоваться несколько миллисекунд до одной минуты.

F #

let doWorkAsync p = async { // doWorkAsync will be called many times
    ....
    let! ret = callWebServiceAsync p' // need to be throttled, high volumn of requestion in a short time will cause blocking
    ....
    let ret' = process ret
    ret'
}

C #

async Task<T> DoWorkAsync(S p) // DoWorkAsync will be called many times
{
    ....
    ret = await CallWebServiceAsync(...); // need to be throttled, high volumn of requestion in a short time will cause blocking
    ....
    return Process(ret);
}

Какрегулировать частоту звонков?Я не уверен, как они обнаруживают вызовы, поэтому лучше вызывать функцию равномерно (без пакетных запросов).

Ответы [ 2 ]

0 голосов
/ 05 июня 2018

Моей первой реакцией будет использование MailboxProcessor.Именно так вы обычно заставляете все вызовы проходить через один шлюз.

Ниже приведена функция throttle, которая будет возвращать асинхронное продолжение не чаще одного раза за промежуток времени.Высокий уровень, он

  1. получает запросы вызова из очереди сообщений (inbox.Receive).Этот запрос содержит канал для возврата результатов.
  2. проверяет, есть ли необходимость отложить от предыдущего запуска, и спит
  3. отмечает время начала текущего вызова (обратите внимание, вы можете поменять это местамии шаг 4, если вы хотите регулировать в зависимости от времени окончания вызовов)
  4. запускает цикл вызывающего абонента (chan.Reply)
  5. для получения другого запроса

Код выглядит следующим образом:

let createThrottler (delay: TimeSpan) =
  MailboxProcessor.Start(fun inbox ->
    let rec loop (lastCallTime: DateTime option) =
      async {
        let! (chan: AsyncReplyChannel<_>) = inbox.Receive()
        let sleepTime =
          match lastCallTime with
          | None -> 0
          | Some time -> int((time - DateTime.Now + delay).TotalMilliseconds)
        if sleepTime > 0 then
          do! Async.Sleep sleepTime
        let lastCallTime = DateTime.Now
        chan.Reply()
        return! loop(Some lastCallTime)
      }
    loop None)

Затем вы можете использовать его следующим образом:

[<EntryPoint>]
let main argv =
  // Dummy implementation of callWebServiceAsync
  let sw = Stopwatch.StartNew()
  let callWebServiceAsync i =
    async {
      printfn "Start %d %d" i sw.ElapsedMilliseconds
      do! Async.Sleep(100)
      printfn "End %d %d" i sw.ElapsedMilliseconds
      return i
    }

  // Create a throttler MailboxProcessor and then the throttled function from that.
  let webServiceThrottler = createThrottler (TimeSpan.FromSeconds 1.)
  let callWebServiceAsyncThrottled i =
    async {
       do! webServiceThrottler.PostAndAsyncReply(id)
       return! callWebServiceAsync i
    }

  // Some tests
  Async.Start(async { let! i = callWebServiceAsyncThrottled 0
                      printfn "0 returned %d" i
                      let! i = callWebServiceAsyncThrottled 1
                      printfn "1 returned %d" i
                      let! i = callWebServiceAsyncThrottled 2
                      printfn "2 returned %d" i })
  Async.Start(callWebServiceAsyncThrottled 3 |> Async.Ignore)
  Async.Start(callWebServiceAsyncThrottled 4 |> Async.Ignore)
  Async.Start(callWebServiceAsyncThrottled 5 |> Async.Ignore)
  Async.Start(callWebServiceAsyncThrottled 6 |> Async.Ignore)
  Console.ReadLine() |> ignore
  0

Если вы запустите это, вы увидите, что он ограничивает ваши звонки в эту службу какжелательно, независимо от того, работаете ли вы параллельно, последовательно или одновременно.

0 голосов
/ 04 июня 2018

Если вы хотите ограничить частоту вызовов чем-то в миллисекундном диапазоне, вам придется использовать вызов Win32, чтобы получить метку времени с более высоким разрешением, чем обычно доступно при использовании System.DateTime.Я бы, вероятно, использовал QueryUnbiasedInterruptTime, чтобы получить время с шагом 100 нс.Затем вы могли бы просто отслеживать последний раз, когда вы сделали вызов, и асинхронно спать, пока не истечет интервал, используя блокировку для синхронизации обновлений с временем последнего вызова:

open System
open System.Runtime.InteropServices
open System.Runtime.Versioning
open System.Threading

// Wrap the Win32 call to get the current time with 1-millisecond resolution
module private Timestamp =

    [<DllImport("kernel32.dll")>]
    [<ResourceExposure(ResourceScope.None)>]
    extern bool QueryUnbiasedInterruptTime (int64& value)

    let inline private queryUnbiasedInterruptTime () =
        let mutable ticks = 0L
        if QueryUnbiasedInterruptTime &ticks
        then Some ticks
        else None

    /// Get the current timestamp in milliseconds
    let get () =
        match queryUnbiasedInterruptTime() with
        | Some ticks -> ticks / 1000L
        | _ -> DateTime.UtcNow.Ticks / TimeSpan.TicksPerMillisecond


// Stub for request and response types
type Request = Request
type Response = Response

// Minimum time between calls (ms) =
let callFrequencyThreshold = 10L

// A wrapper around the call to the service that will throttle the requests
let doWorkAsync : Request -> Async<Response> =
    // Milliseconds since last call to service
    let mutable lastCallTime = 0L

    // Lock to protect the last call time
    let syncRoot = obj()

    // The real call to the service
    let callService request =
        async {
            // Simulate work
            do! Async.Sleep 1000 
            return Response
        }

    // Accept each request and wait until the threshold has elapsed to call the service
    fun request ->
        async {
            let rec run () =
                lock syncRoot <| fun () ->       
                    async {
                        let currentTime = Timestamp.get()
                        if currentTime - lastCallTime > callFrequencyThreshold
                        then lastCallTime <- currentTime
                             return! callService request
                        else do! Async.Sleep <| int (callFrequencyThreshold - (currentTime - lastCallTime))
                             return! run ()
                    }
            return! run ()
        } 

Однако я бы не сталрекомендовать основанный на времени подход к регулированию, если это абсолютно необходимо.Лично я предпочел бы что-то вроде семафора, чтобы ограничить количество одновременных вызовов к сервису.Таким образом, вы можете обеспечить только один вызов службы за раз, если это требуется, или разрешить настраиваемые n вызовы службы за раз в зависимости от среды и т. Д. Это также значительно упрощает код, и вНа мой взгляд, дает более надежную реализацию:

open System.Threading

// Stub for request and response types
type Request = Request
type Response = Response

// A wrapper around the call to the service that will throttle the requests
let doWorkAsync : Request -> Async<Response> = 
    // A semaphore to limit the number of concurrent calls
    let concurrencyLimit = 10
    let semaphore = new SemaphoreSlim(concurrencyLimit, concurrencyLimit)

    // The real call to the service
    let callService request =
        async {
            // Simulate work
            do! Async.Sleep 1000 
            return Response
        }

    // Accept each request, wait for a semaphore token to be available, 
    // then call the service
    fun request ->
        async {
            do! semaphore.WaitAsync() |> Async.AwaitTask
            return! callService request
        }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...