Безблокировочный способ передачи сигналов между асинхронными - PullRequest
0 голосов
/ 17 мая 2018

Я ищу способ без блокировки сигнализировать между двумя Async с в F #.У меня есть две хвостовые рекурсивные функции async, и я хочу, чтобы одна выполнялась до тех пор, пока другая не будет сигнализирована другой, прежде чем перейти к следующей рекурсии.Я мог бы использовать событие для этого, но похоже, что события .NET используют внутренние блокировки.Единственное решение, которое я нашел до сих пор, - это использование Keyed Events из ntdll.dll, но я бы предпочел решение, которое не требует прямой ссылки на платформенно-зависимую DLL.Есть ли способ, которым я могу использовать System.Threading.Interlocked или другую технику .NET для достижения этой цели?

Вот простой пример того, чего я хочу достичь:

let rec loop1 () =
    async {
        // do work
        // somehow signal loop2
        return! loop1 ()
    }

let rec loop2 state = 
    async {
        // wait for signal from loop1
        // do work
        return! loop2 state  // This would actually be a new state, not the old state
    }

1 Ответ

0 голосов
/ 17 мая 2018

Я взглянул на предложение Сзера смоделировать событие на IVar Хопака и выяснил, как реализовано стандартное событие F #.Объединив два, я придумал это:

open System
open System.Threading

type LockFreeEvent<'args>() =
    let mutable multicast: Handler<'args> = null
    let wait = 
        let spin = SpinWait()
        spin.SpinOnce

    member __.Trigger arg = 
        match multicast with 
        | null -> ()
        | d -> d.Invoke(null, arg) |> ignore

    member __.Publish = 
        {new IEvent<'args> with
            member __.AddHandler handler = 
                let snapshot = multicast
                while snapshot <> Interlocked.CompareExchange<Handler<'args>>(&multicast, Delegate.Combine(multicast, handler) :?> Handler<'args>, snapshot) do
                    wait ()
            member __.RemoveHandler handler =
                let snapshot = multicast
                while snapshot <> Interlocked.CompareExchange(&multicast, Delegate.Remove(multicast, handler) :?> Handler<'args>, snapshot) do 
                    wait ()
            member this.Subscribe observer =
                let handler = new Handler<_>(fun sender args -> observer.OnNext(args))
                (this :?> IEvent<_,_>).AddHandler(handler)
                { new IDisposable with 
                    member __.Dispose() = (this :?> IEvent<_,_>).RemoveHandler(handler) 
                }
        }

Как это выглядит?Я думаю, что это должно реализовать ту же функциональность, что и стандартное событие F #, но без блокировки, если в Delegate.Combine не происходит блокировка.Я думаю, что мне, возможно, придется сделать Trigger также по-другому.

...